From 8186b3676b76516cd946bf54158d2b1abc10970b Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 7 Jan 2025 19:31:32 -0800 Subject: [PATCH 1/9] Add grpc metrics to encoder v2 --- disperser/cmd/encoder/main.go | 1 + disperser/encoder/server_v2.go | 27 +++++++++++++++++++-------- relay/chunk_provider.go | 9 +++++---- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/disperser/cmd/encoder/main.go b/disperser/cmd/encoder/main.go index 3382deedfa..6d9645e35d 100644 --- a/disperser/cmd/encoder/main.go +++ b/disperser/cmd/encoder/main.go @@ -115,6 +115,7 @@ func RunEncoderServer(ctx *cli.Context) error { logger, prover, metrics, + grpcMetrics, ) return server.Start() diff --git a/disperser/encoder/server_v2.go b/disperser/encoder/server_v2.go index bc1b3d0d25..bab53f07f0 100644 --- a/disperser/encoder/server_v2.go +++ b/disperser/encoder/server_v2.go @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/eigenda/encoding/rs" "github.com/Layr-Labs/eigenda/relay/chunkstore" "github.com/Layr-Labs/eigensdk-go/logging" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" @@ -31,21 +32,30 @@ type EncoderServerV2 struct { logger logging.Logger prover encoding.Prover metrics *Metrics + grpcMetrics *grpcprom.ServerMetrics close func() runningRequests chan struct{} requestPool chan struct{} } -func NewEncoderServerV2(config ServerConfig, blobStore *blobstore.BlobStore, chunkWriter chunkstore.ChunkWriter, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServerV2 { +func NewEncoderServerV2( + config ServerConfig, + blobStore *blobstore.BlobStore, + chunkWriter chunkstore.ChunkWriter, + logger logging.Logger, + prover encoding.Prover, + metrics *Metrics, + grpcMetrics *grpcprom.ServerMetrics, +) *EncoderServerV2 { return &EncoderServerV2{ - config: config, - blobStore: blobStore, - chunkWriter: chunkWriter, - logger: logger.With("component", "EncoderServer"), - prover: prover, - metrics: metrics, - + config: config, + blobStore: blobStore, + chunkWriter: chunkWriter, + logger: logger.With("component", "EncoderServerV2"), + prover: prover, + metrics: metrics, + grpcMetrics: grpcMetrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), requestPool: make(chan struct{}, config.RequestPoolSize), } @@ -62,6 +72,7 @@ func (s *EncoderServerV2) Start() error { gs := grpc.NewServer() reflection.Register(gs) pb.RegisterEncoderServer(gs, s) + s.grpcMetrics.InitializeMetrics(gs) // Register Server for Health Checks name := pb.Encoder_ServiceDesc.ServiceName diff --git a/relay/chunk_provider.go b/relay/chunk_provider.go index 2f9a33ead7..3009310b71 100644 --- a/relay/chunk_provider.go +++ b/relay/chunk_provider.go @@ -4,14 +4,15 @@ import ( "bytes" "context" "fmt" + "sync" + "time" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/rs" "github.com/Layr-Labs/eigenda/relay/cache" "github.com/Layr-Labs/eigenda/relay/chunkstore" "github.com/Layr-Labs/eigensdk-go/logging" - "sync" - "time" ) type chunkProvider struct { @@ -110,7 +111,7 @@ func (s *chunkProvider) GetFrames(ctx context.Context, mMap metadataMap) (frameM go func() { frames, err := s.frameCache.Get(ctx, *boundKey) if err != nil { - s.logger.Errorf("Failed to get frames for blob %v: %v", boundKey.blobKey, err) + s.logger.Errorf("Failed to get frames for blob %v: %v", boundKey.blobKey.Hex(), err) completionChannel <- &framesResult{ key: boundKey.blobKey, err: err, @@ -129,7 +130,7 @@ func (s *chunkProvider) GetFrames(ctx context.Context, mMap metadataMap) (frameM for len(fMap) < len(keys) { result := <-completionChannel if result.err != nil { - return nil, fmt.Errorf("error fetching frames for blob %v: %w", result.key, result.err) + return nil, fmt.Errorf("error fetching frames for blob %v: %w", result.key.Hex(), result.err) } fMap[result.key] = result.data } From 9b116eb49d5ce8271f79eb7ae9071c1a131d2799 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 7 Jan 2025 19:34:30 -0800 Subject: [PATCH 2/9] Fix test --- disperser/encoder/server_v2_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/disperser/encoder/server_v2_test.go b/disperser/encoder/server_v2_test.go index 15d13a9065..667b691412 100644 --- a/disperser/encoder/server_v2_test.go +++ b/disperser/encoder/server_v2_test.go @@ -18,6 +18,7 @@ import ( "github.com/Layr-Labs/eigenda/encoding/kzg/prover" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/relay/chunkstore" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -209,7 +210,12 @@ func createTestComponents(t *testing.T) *testComponents { t.Helper() prover, err := makeTestProver(300000) require.NoError(t, err, "Failed to create prover") - metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger) + + registry := prometheus.NewRegistry() + metrics := encoder.NewMetrics(registry, "9000", logger) + grpcMetrics := grpcprom.NewServerMetrics() + registry.MustRegister(grpcMetrics) + s3Client := mock.NewS3Client() dynamoDBClient := &mock.MockDynamoDBClient{} blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger) @@ -220,7 +226,7 @@ func createTestComponents(t *testing.T) *testComponents { MaxConcurrentRequests: 10, RequestPoolSize: 5, PreventReencoding: true, - }, blobStore, chunkStoreWriter, logger, prover, metrics) + }, blobStore, chunkStoreWriter, logger, prover, metrics, grpcMetrics) return &testComponents{ encoderServer: encoderServer, From bfb1098a8c8481bed138d7acf0e4221c37ddf6d2 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 7 Jan 2025 19:46:20 -0800 Subject: [PATCH 3/9] Add interceptor --- disperser/encoder/server_v2.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/disperser/encoder/server_v2.go b/disperser/encoder/server_v2.go index bab53f07f0..628c4c96bc 100644 --- a/disperser/encoder/server_v2.go +++ b/disperser/encoder/server_v2.go @@ -69,7 +69,11 @@ func (s *EncoderServerV2) Start() error { log.Fatalf("Could not start tcp listener: %v", err) } - gs := grpc.NewServer() + gs := grpc.NewServer( + grpc.UnaryInterceptor( + s.grpcMetrics.UnaryServerInterceptor(), + ), + ) reflection.Register(gs) pb.RegisterEncoderServer(gs, s) s.grpcMetrics.InitializeMetrics(gs) From 2f180f7cc66b0584f76ebaba01e0b735c16a001e Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 7 Jan 2025 21:46:44 -0800 Subject: [PATCH 4/9] Send blob size through request + add more metrics --- disperser/api/grpc/encoder/v2/encoder.pb.go | 80 +++++++++++--------- disperser/api/proto/encoder/v2/encoder.proto | 1 + disperser/controller/encoding_manager.go | 10 ++- disperser/encoder/client_v2.go | 3 +- disperser/encoder/server.go | 2 +- disperser/encoder/server_v2.go | 51 +++++++++---- disperser/encoder_client_v2.go | 2 +- disperser/mock/encoder_v2.go | 2 +- 8 files changed, 98 insertions(+), 53 deletions(-) diff --git a/disperser/api/grpc/encoder/v2/encoder.pb.go b/disperser/api/grpc/encoder/v2/encoder.pb.go index 182c97bfc9..eadd1fb51c 100644 --- a/disperser/api/grpc/encoder/v2/encoder.pb.go +++ b/disperser/api/grpc/encoder/v2/encoder.pb.go @@ -29,6 +29,7 @@ type EncodeBlobRequest struct { BlobKey []byte `protobuf:"bytes,1,opt,name=blob_key,json=blobKey,proto3" json:"blob_key,omitempty"` EncodingParams *EncodingParams `protobuf:"bytes,2,opt,name=encoding_params,json=encodingParams,proto3" json:"encoding_params,omitempty"` + BlobSize uint64 `protobuf:"varint,3,opt,name=blob_size,json=blobSize,proto3" json:"blob_size,omitempty"` } func (x *EncodeBlobRequest) Reset() { @@ -77,6 +78,13 @@ func (x *EncodeBlobRequest) GetEncodingParams() *EncodingParams { return nil } +func (x *EncodeBlobRequest) GetBlobSize() uint64 { + if x != nil { + return x.BlobSize + } + return 0 +} + // EncodingParams specifies how the blob should be encoded into chunks type EncodingParams struct { state protoimpl.MessageState @@ -242,41 +250,43 @@ var File_encoder_v2_encoder_proto protoreflect.FileDescriptor var file_encoder_v2_encoder_proto_rawDesc = []byte{ 0x0a, 0x18, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x65, 0x6e, 0x63, 0x6f, - 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x22, 0x73, 0x0a, 0x11, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, - 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, - 0x6c, 0x6f, 0x62, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x62, - 0x6c, 0x6f, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x43, 0x0a, 0x0f, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6e, 0x63, - 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x52, 0x0e, 0x65, 0x6e, 0x63, - 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x22, 0x52, 0x0a, 0x0e, 0x45, - 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x12, 0x21, 0x0a, - 0x0c, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x0b, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, - 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x5f, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6e, 0x75, 0x6d, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x22, - 0x73, 0x0a, 0x0c, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x33, 0x0a, 0x16, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x73, - 0x69, 0x7a, 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x13, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x53, 0x69, 0x7a, 0x65, 0x42, - 0x79, 0x74, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x66, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, - 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x11, 0x66, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x42, - 0x79, 0x74, 0x65, 0x73, 0x22, 0x50, 0x0a, 0x0f, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x42, 0x6c, - 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x3d, 0x0a, 0x0d, 0x66, 0x72, 0x61, 0x67, 0x6d, - 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, - 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x46, 0x72, 0x61, 0x67, - 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0c, 0x66, 0x72, 0x61, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x32, 0x55, 0x0a, 0x07, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, - 0x72, 0x12, 0x4a, 0x0a, 0x0a, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, - 0x1d, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6e, 0x63, - 0x6f, 0x64, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, - 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6e, 0x63, 0x6f, - 0x64, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x32, 0x5a, - 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, - 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, - 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, - 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x22, 0x90, 0x01, 0x0a, 0x11, 0x45, 0x6e, 0x63, 0x6f, 0x64, + 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, + 0x62, 0x6c, 0x6f, 0x62, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, + 0x62, 0x6c, 0x6f, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x43, 0x0a, 0x0f, 0x65, 0x6e, 0x63, 0x6f, 0x64, + 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6e, + 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x52, 0x0e, 0x65, 0x6e, + 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x12, 0x1b, 0x0a, 0x09, + 0x62, 0x6c, 0x6f, 0x62, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x08, 0x62, 0x6c, 0x6f, 0x62, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x52, 0x0a, 0x0e, 0x45, 0x6e, 0x63, + 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x63, + 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x0b, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x12, 0x1d, + 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x5f, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x09, 0x6e, 0x75, 0x6d, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x22, 0x73, 0x0a, + 0x0c, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x33, 0x0a, + 0x16, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x73, 0x69, 0x7a, + 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x13, 0x74, + 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x53, 0x69, 0x7a, 0x65, 0x42, 0x79, 0x74, + 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x66, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x73, + 0x69, 0x7a, 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x11, 0x66, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x42, 0x79, 0x74, + 0x65, 0x73, 0x22, 0x50, 0x0a, 0x0f, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x42, 0x6c, 0x6f, 0x62, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x3d, 0x0a, 0x0d, 0x66, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, + 0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x65, + 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, + 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0c, 0x66, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, + 0x49, 0x6e, 0x66, 0x6f, 0x32, 0x55, 0x0a, 0x07, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x12, + 0x4a, 0x0a, 0x0a, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1d, 0x2e, + 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6e, 0x63, 0x6f, 0x64, + 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x65, + 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, + 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x32, 0x5a, 0x30, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, + 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x67, 0x72, 0x70, 0x63, 0x2f, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/disperser/api/proto/encoder/v2/encoder.proto b/disperser/api/proto/encoder/v2/encoder.proto index 679e716be4..20a8fcfe1c 100644 --- a/disperser/api/proto/encoder/v2/encoder.proto +++ b/disperser/api/proto/encoder/v2/encoder.proto @@ -15,6 +15,7 @@ service Encoder { message EncodeBlobRequest { bytes blob_key = 1; EncodingParams encoding_params = 2; + uint64 blob_size = 3; } // EncodingParams specifies how the blob should be encoded into chunks diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 2dd6fa85ff..cbd9f444d1 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -20,6 +20,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" + "google.golang.org/grpc/metadata" ) var errNoBlobsToEncode = errors.New("no blobs to encode") @@ -269,11 +270,18 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { } func (e *EncodingManager) encodeBlob(ctx context.Context, blobKey corev2.BlobKey, blob *v2.BlobMetadata, blobParams *core.BlobVersionParameters) (*encoding.FragmentInfo, error) { + // Add headers for routing + md := metadata.New(map[string]string{ + "content-type": "application/grpc", + "x-blob-size": fmt.Sprintf("%d", blob.BlobSize), + }) + ctx = metadata.NewOutgoingContext(ctx, md) + encodingParams, err := blob.BlobHeader.GetEncodingParams(blobParams) if err != nil { return nil, fmt.Errorf("failed to get encoding params: %w", err) } - return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams) + return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams, blob.BlobSize) } func (e *EncodingManager) refreshBlobVersionParams(ctx context.Context) error { diff --git a/disperser/encoder/client_v2.go b/disperser/encoder/client_v2.go index 815bc9f85e..b8db40f916 100644 --- a/disperser/encoder/client_v2.go +++ b/disperser/encoder/client_v2.go @@ -22,7 +22,7 @@ func NewEncoderClientV2(addr string) (disperser.EncoderClientV2, error) { }, nil } -func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams) (*encoding.FragmentInfo, error) { +func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams, blobSize uint64) (*encoding.FragmentInfo, error) { // Establish connection conn, err := grpc.NewClient( c.addr, @@ -43,6 +43,7 @@ func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encod ChunkLength: encodingParams.ChunkLength, NumChunks: encodingParams.NumChunks, }, + BlobSize: blobSize, } // Make the RPC call diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index 9cab207a30..64e6b726e1 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -111,7 +111,7 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques s.metrics.ObserveQueue(s.queueStats) s.queueLock.Unlock() default: - s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData())) + s.metrics.IncrementRateLimitedBlobRequestNum(blobSize) s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) return nil, errors.New("too many requests") } diff --git a/disperser/encoder/server_v2.go b/disperser/encoder/server_v2.go index 628c4c96bc..22b425a1da 100644 --- a/disperser/encoder/server_v2.go +++ b/disperser/encoder/server_v2.go @@ -2,15 +2,18 @@ package encoder import ( "context" + "errors" "fmt" "log" "net" + "sync" "time" "github.com/Layr-Labs/eigenda/common/healthcheck" corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder/v2" + "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/rs" @@ -36,7 +39,10 @@ type EncoderServerV2 struct { close func() runningRequests chan struct{} - requestPool chan struct{} + requestPool chan blobRequest + + queueStats map[string]int + queueLock sync.Mutex } func NewEncoderServerV2( @@ -48,6 +54,8 @@ func NewEncoderServerV2( metrics *Metrics, grpcMetrics *grpcprom.ServerMetrics, ) *EncoderServerV2 { + metrics.SetQueueCapacity(config.RequestPoolSize) + return &EncoderServerV2{ config: config, blobStore: blobStore, @@ -57,7 +65,8 @@ func NewEncoderServerV2( metrics: metrics, grpcMetrics: grpcMetrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), - requestPool: make(chan struct{}, config.RequestPoolSize), + requestPool: make(chan blobRequest, config.RequestPoolSize), + queueStats: make(map[string]int), } } @@ -100,30 +109,36 @@ func (s *EncoderServerV2) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequ s.metrics.ObserveLatency("total", time.Since(totalStart)) }() + blobSize := req.GetBlobSize() + sizeBucket := common.BlobSizeBucket(int(blobSize)) + // Rate limit select { - case s.requestPool <- struct{}{}: + case s.requestPool <- blobRequest{blobSizeByte: int(blobSize)}: + s.queueLock.Lock() + s.queueStats[sizeBucket]++ + s.metrics.ObserveQueue(s.queueStats) + s.queueLock.Unlock() default: - // TODO: Now that we no longer pass the data directly, should we pass in blob size as part of the request? - s.metrics.IncrementRateLimitedBlobRequestNum(1) + s.metrics.IncrementRateLimitedBlobRequestNum(int(blobSize)) s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) - return nil, status.Error(codes.ResourceExhausted, "request pool is full") + return nil, errors.New("too many requests") } // Limit the number of concurrent requests s.runningRequests <- struct{}{} defer s.popRequest() if ctx.Err() != nil { - s.metrics.IncrementCanceledBlobRequestNum(1) + s.metrics.IncrementCanceledBlobRequestNum(int(blobSize)) return nil, status.Error(codes.Canceled, "request was canceled") } s.metrics.ObserveLatency("queuing", time.Since(totalStart)) reply, err := s.handleEncodingToChunkStore(ctx, req) if err != nil { - s.metrics.IncrementFailedBlobRequestNum(1) + s.metrics.IncrementFailedBlobRequestNum(int(blobSize)) } else { - s.metrics.IncrementSuccessfulBlobRequestNum(1) + s.metrics.IncrementSuccessfulBlobRequestNum(int(blobSize)) } return reply, err @@ -161,6 +176,7 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p if len(data) == 0 { return nil, status.Error(codes.NotFound, "blob length is zero") } + s.metrics.ObserveLatency("s3_download", time.Since(fetchStart)) s.logger.Info("fetched blob", "duration", time.Since(fetchStart).String()) // Encode the data @@ -170,15 +186,19 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p s.logger.Error("failed to encode frames", "error", err) return nil, status.Errorf(codes.Internal, "encoding failed: %v", err) } + s.metrics.ObserveLatency("encoding", time.Since(encodingStart)) s.logger.Info("encoding frames", "duration", time.Since(encodingStart).String()) - // Process and store results return s.processAndStoreResults(ctx, blobKey, frames) } func (s *EncoderServerV2) popRequest() { - <-s.requestPool + blobRequest := <-s.requestPool <-s.runningRequests + s.queueLock.Lock() + s.queueStats[common.BlobSizeBucket(blobRequest.blobSizeByte)]-- + s.metrics.ObserveQueue(s.queueStats) + s.queueLock.Unlock() } func (s *EncoderServerV2) validateAndParseRequest(req *pb.EncodeBlobRequest) (corev2.BlobKey, encoding.EncodingParams, error) { @@ -229,13 +249,17 @@ func (s *EncoderServerV2) validateAndParseRequest(req *pb.EncodeBlobRequest) (co } func (s *EncoderServerV2) processAndStoreResults(ctx context.Context, blobKey corev2.BlobKey, frames []*encoding.Frame) (*pb.EncodeBlobReply, error) { - proofs, coeffs := extractProofsAndCoeffs(frames) - // Store proofs storeStart := time.Now() + defer func() { + s.metrics.ObserveLatency("process_and_store_results", time.Since(storeStart)) + }() + + proofs, coeffs := extractProofsAndCoeffs(frames) if err := s.chunkWriter.PutChunkProofs(ctx, blobKey, proofs); err != nil { return nil, status.Errorf(codes.Internal, "failed to upload chunk proofs: %v", err) } + s.metrics.ObserveLatency("s3_upload_proofs", time.Since(storeStart)) s.logger.Info("stored proofs", "duration", time.Since(storeStart).String()) // Store coefficients @@ -244,6 +268,7 @@ func (s *EncoderServerV2) processAndStoreResults(ctx context.Context, blobKey co if err != nil { return nil, status.Errorf(codes.Internal, "failed to upload chunk coefficients: %v", err) } + s.metrics.ObserveLatency("s3_upload_coefficients", time.Since(coeffStart)) s.logger.Info("stored coefficients", "duration", time.Since(coeffStart).String()) return &pb.EncodeBlobReply{ diff --git a/disperser/encoder_client_v2.go b/disperser/encoder_client_v2.go index a88031996e..a24dcce008 100644 --- a/disperser/encoder_client_v2.go +++ b/disperser/encoder_client_v2.go @@ -8,5 +8,5 @@ import ( ) type EncoderClientV2 interface { - EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams) (*encoding.FragmentInfo, error) + EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams, blobSize uint64) (*encoding.FragmentInfo, error) } diff --git a/disperser/mock/encoder_v2.go b/disperser/mock/encoder_v2.go index 7ef5d668b7..403a397cad 100644 --- a/disperser/mock/encoder_v2.go +++ b/disperser/mock/encoder_v2.go @@ -19,7 +19,7 @@ func NewMockEncoderClientV2() *MockEncoderClientV2 { return &MockEncoderClientV2{} } -func (m *MockEncoderClientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams) (*encoding.FragmentInfo, error) { +func (m *MockEncoderClientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams, blobSize uint64) (*encoding.FragmentInfo, error) { args := m.Called() var fragmentInfo *encoding.FragmentInfo if args.Get(0) != nil { From 70be6ecd93d25f85ee7ba1952050575fe612656c Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Wed, 8 Jan 2025 11:48:58 -0800 Subject: [PATCH 5/9] Update request pool to request queue --- disperser/cmd/encoder/flags/flags.go | 8 ++++++++ disperser/encoder/config.go | 1 + disperser/encoder/server_v2.go | 12 ++++++------ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/disperser/cmd/encoder/flags/flags.go b/disperser/cmd/encoder/flags/flags.go index dedb228d6b..a291b064fc 100644 --- a/disperser/cmd/encoder/flags/flags.go +++ b/disperser/cmd/encoder/flags/flags.go @@ -62,6 +62,13 @@ var ( Value: 32, EnvVar: common.PrefixEnvVar(envVarPrefix, "REQUEST_POOL_SIZE"), } + RequestQueueSizeFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "request-queue-size"), + Usage: "maximum number of requests in the request queue", + Required: false, + Value: 32, + EnvVar: common.PrefixEnvVar(envVarPrefix, "REQUEST_QUEUE_SIZE"), + } EnableGnarkChunkEncodingFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "enable-gnark-chunk-encoding"), Usage: "if true, will produce chunks in Gnark, instead of Gob", @@ -111,6 +118,7 @@ var optionalFlags = []cli.Flag{ EnableMetrics, MaxConcurrentRequestsFlag, RequestPoolSizeFlag, + RequestQueueSizeFlag, EnableGnarkChunkEncodingFlag, EncoderVersionFlag, S3BucketNameFlag, diff --git a/disperser/encoder/config.go b/disperser/encoder/config.go index 168f4185dc..2ff7f4f18f 100644 --- a/disperser/encoder/config.go +++ b/disperser/encoder/config.go @@ -8,6 +8,7 @@ type ServerConfig struct { GrpcPort string MaxConcurrentRequests int RequestPoolSize int + RequestQueueSize int EnableGnarkChunkEncoding bool PreventReencoding bool Backend string diff --git a/disperser/encoder/server_v2.go b/disperser/encoder/server_v2.go index 22b425a1da..947ad20b29 100644 --- a/disperser/encoder/server_v2.go +++ b/disperser/encoder/server_v2.go @@ -39,7 +39,7 @@ type EncoderServerV2 struct { close func() runningRequests chan struct{} - requestPool chan blobRequest + requestQueue chan blobRequest queueStats map[string]int queueLock sync.Mutex @@ -54,7 +54,7 @@ func NewEncoderServerV2( metrics *Metrics, grpcMetrics *grpcprom.ServerMetrics, ) *EncoderServerV2 { - metrics.SetQueueCapacity(config.RequestPoolSize) + metrics.SetQueueCapacity(config.RequestQueueSize) return &EncoderServerV2{ config: config, @@ -65,7 +65,7 @@ func NewEncoderServerV2( metrics: metrics, grpcMetrics: grpcMetrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), - requestPool: make(chan blobRequest, config.RequestPoolSize), + requestQueue: make(chan blobRequest, config.RequestQueueSize), queueStats: make(map[string]int), } } @@ -114,14 +114,14 @@ func (s *EncoderServerV2) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequ // Rate limit select { - case s.requestPool <- blobRequest{blobSizeByte: int(blobSize)}: + case s.requestQueue <- blobRequest{blobSizeByte: int(blobSize)}: s.queueLock.Lock() s.queueStats[sizeBucket]++ s.metrics.ObserveQueue(s.queueStats) s.queueLock.Unlock() default: s.metrics.IncrementRateLimitedBlobRequestNum(int(blobSize)) - s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) + s.logger.Warn("rate limiting as request queue is full", "requestQueueSize", s.config.RequestQueueSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) return nil, errors.New("too many requests") } @@ -193,7 +193,7 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p } func (s *EncoderServerV2) popRequest() { - blobRequest := <-s.requestPool + blobRequest := <-s.requestQueue <-s.runningRequests s.queueLock.Lock() s.queueStats[common.BlobSizeBucket(blobRequest.blobSizeByte)]-- From 608a269358c5beb651ef8874ab411eea1cffec9b Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Wed, 8 Jan 2025 11:58:14 -0800 Subject: [PATCH 6/9] Update test --- disperser/encoder/server_v2_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disperser/encoder/server_v2_test.go b/disperser/encoder/server_v2_test.go index 667b691412..ae334c0542 100644 --- a/disperser/encoder/server_v2_test.go +++ b/disperser/encoder/server_v2_test.go @@ -224,7 +224,7 @@ func createTestComponents(t *testing.T) *testComponents { encoderServer := encoder.NewEncoderServerV2(encoder.ServerConfig{ GrpcPort: "8080", MaxConcurrentRequests: 10, - RequestPoolSize: 5, + RequestQueueSize: 5, PreventReencoding: true, }, blobStore, chunkStoreWriter, logger, prover, metrics, grpcMetrics) From fb51bbe558a43b424e907519c5daeb4422b8d428 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Wed, 8 Jan 2025 14:45:33 -0800 Subject: [PATCH 7/9] Add config --- inabox/deploy/config.go | 2 ++ inabox/deploy/env_vars.go | 8 +++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 1050759bb8..c3e9f687fb 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -288,6 +288,7 @@ func (env *Config) generateEncoderVars(ind int, grpcPort string) EncoderVars { DISPERSER_ENCODER_NUM_WORKERS: fmt.Sprint(runtime.GOMAXPROCS(0)), DISPERSER_ENCODER_MAX_CONCURRENT_REQUESTS: "16", DISPERSER_ENCODER_REQUEST_POOL_SIZE: "32", + DISPERSER_ENCODER_REQUEST_QUEUE_SIZE: "32", } env.applyDefaults(&v, "DISPERSER_ENCODER", "enc", ind) @@ -314,6 +315,7 @@ func (env *Config) generateEncoderV2Vars(ind int, grpcPort string) EncoderVars { DISPERSER_ENCODER_REQUEST_POOL_SIZE: "32", DISPERSER_ENCODER_ENCODER_VERSION: "2", DISPERSER_ENCODER_S3_BUCKET_NAME: "test-eigenda-blobstore", + DISPERSER_ENCODER_REQUEST_QUEUE_SIZE: "32", } env.applyDefaults(&v, "DISPERSER_ENCODER", "enc", ind) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index 19ee4b9df4..52f3fd1167 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -265,6 +265,8 @@ type EncoderVars struct { DISPERSER_ENCODER_REQUEST_POOL_SIZE string + DISPERSER_ENCODER_REQUEST_QUEUE_SIZE string + DISPERSER_ENCODER_ENABLE_GNARK_CHUNK_ENCODING string DISPERSER_ENCODER_ENCODER_VERSION string @@ -493,11 +495,9 @@ type RetrieverVars struct { RETRIEVER_NUM_CONNECTIONS string - RETRIEVER_DATA_DIR string - RETRIEVER_METRICS_HTTP_PORT string - RETRIEVER_USE_GRAPH string + RETRIEVER_EIGENDA_VERSION string RETRIEVER_G1_PATH string @@ -535,8 +535,6 @@ type RetrieverVars struct { RETRIEVER_LOG_FORMAT string - RETRIEVER_INDEXER_PULL_INTERVAL string - RETRIEVER_GRAPH_URL string RETRIEVER_GRAPH_BACKOFF string From 5c571ac876908cec22415ce0fe46f0b74bfd9e49 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Wed, 8 Jan 2025 14:51:30 -0800 Subject: [PATCH 8/9] rebase --- inabox/deploy/config.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index c3e9f687fb..3e86aa327f 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -467,8 +467,6 @@ func (env *Config) generateRetrieverVars(ind int, key string, graphUrl, logPath, RETRIEVER_GRAPH_URL: graphUrl, RETRIEVER_GRAPH_BACKOFF: "1s", RETRIEVER_GRAPH_MAX_RETRIES: "3", - - RETRIEVER_INDEXER_PULL_INTERVAL: "1s", } v.RETRIEVER_G2_PATH = "" From c8afad534534c81224ca7fa41f89e15eb2f275d9 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:09:16 -0800 Subject: [PATCH 9/9] fix config --- disperser/cmd/encoder/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/disperser/cmd/encoder/config.go b/disperser/cmd/encoder/config.go index d69c2e6857..038712ad31 100644 --- a/disperser/cmd/encoder/config.go +++ b/disperser/cmd/encoder/config.go @@ -56,6 +56,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name), MaxConcurrentRequests: ctx.GlobalInt(flags.MaxConcurrentRequestsFlag.Name), RequestPoolSize: ctx.GlobalInt(flags.RequestPoolSizeFlag.Name), + RequestQueueSize: ctx.GlobalInt(flags.RequestQueueSizeFlag.Name), EnableGnarkChunkEncoding: ctx.Bool(flags.EnableGnarkChunkEncodingFlag.Name), PreventReencoding: ctx.Bool(flags.PreventReencodingFlag.Name), Backend: ctx.String(flags.BackendFlag.Name),