From 28b33686618e220b61973c685331c3ca5e489239 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 8 Jan 2025 01:19:49 +0000 Subject: [PATCH] Add support for fetch all operators' responses for a batch --- .../v2/blobstore/dynamo_metadata_store.go | 29 +++++++++ .../blobstore/dynamo_metadata_store_test.go | 37 ++++++++++++ disperser/dataapi/docs/v1/V1_docs.go | 18 ------ disperser/dataapi/docs/v1/V1_swagger.json | 18 ------ disperser/dataapi/docs/v1/V1_swagger.yaml | 12 ---- disperser/dataapi/docs/v2/V2_docs.go | 13 ++-- disperser/dataapi/docs/v2/V2_swagger.json | 13 ++-- disperser/dataapi/docs/v2/V2_swagger.yaml | 12 ++-- disperser/dataapi/v2/server_v2.go | 60 ++++++++++++------- disperser/dataapi/v2/server_v2_test.go | 47 +++++++++++++-- 10 files changed, 169 insertions(+), 90 deletions(-) diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index 0fff046ca..b33c2adb0 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -480,6 +480,35 @@ func (s *BlobMetadataStore) GetDispersalResponse(ctx context.Context, batchHeade return res, nil } +func (s *BlobMetadataStore) GetDispersalResponses(ctx context.Context, batchHeaderHash [32]byte) ([]*corev2.DispersalResponse, error) { + items, err := s.dynamoDBClient.Query(ctx, s.tableName, "PK = :pk AND begins_with(SK, :prefix)", commondynamodb.ExpressionValues{ + ":pk": &types.AttributeValueMemberS{ + Value: dispersalKeyPrefix + hex.EncodeToString(batchHeaderHash[:]), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: dispersalResponseSKPrefix, + }, + }) + + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("%w: dispersal responses not found for batch header hash %x", common.ErrMetadataNotFound, batchHeaderHash) + } + + responses := make([]*corev2.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + return nil, err + } + } + + return responses, nil +} + func (s *BlobMetadataStore) PutBatchHeader(ctx context.Context, batchHeader *corev2.BatchHeader) error { item, err := MarshalBatchHeader(batchHeader) if err != nil { diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index 7feb001ea..7fe957b06 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -336,15 +336,52 @@ func TestBlobMetadataStoreDispersals(t *testing.T) { err = blobMetadataStore.PutDispersalResponse(ctx, dispersalResponse) assert.ErrorIs(t, err, common.ErrAlreadyExists) + // the other operator's response for the same batch + opID2 := core.OperatorID{2, 3} + dispersalRequest2 := &corev2.DispersalRequest{ + OperatorID: opID2, + OperatorAddress: gethcommon.HexToAddress("0x2234567"), + Socket: "socket", + DispersedAt: uint64(time.Now().UnixNano()), + BatchHeader: corev2.BatchHeader{ + BatchRoot: [32]byte{1, 2, 3}, + ReferenceBlockNumber: 100, + }, + } + err = blobMetadataStore.PutDispersalRequest(ctx, dispersalRequest2) + assert.NoError(t, err) + dispersalResponse2 := &corev2.DispersalResponse{ + DispersalRequest: dispersalRequest2, + RespondedAt: uint64(time.Now().UnixNano()), + Signature: [32]byte{1, 1, 1}, + Error: "", + } + err = blobMetadataStore.PutDispersalResponse(ctx, dispersalResponse2) + assert.NoError(t, err) + + responses, err := blobMetadataStore.GetDispersalResponses(ctx, bhh) + assert.NoError(t, err) + assert.Equal(t, 2, len(responses)) + assert.Equal(t, dispersalResponse, responses[0]) + assert.Equal(t, dispersalResponse2, responses[1]) + deleteItems(t, []commondynamodb.Key{ { "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, "SK": &types.AttributeValueMemberS{Value: "DispersalRequest#" + opID.Hex()}, }, + { + "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, + "SK": &types.AttributeValueMemberS{Value: "DispersalRequest#" + opID2.Hex()}, + }, { "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, "SK": &types.AttributeValueMemberS{Value: "DispersalResponse#" + opID.Hex()}, }, + { + "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, + "SK": &types.AttributeValueMemberS{Value: "DispersalResponse#" + opID2.Hex()}, + }, }) } diff --git a/disperser/dataapi/docs/v1/V1_docs.go b/disperser/dataapi/docs/v1/V1_docs.go index 637b56b86..82362ff9b 100644 --- a/disperser/dataapi/docs/v1/V1_docs.go +++ b/disperser/dataapi/docs/v1/V1_docs.go @@ -1126,12 +1126,6 @@ const docTemplateV1 = `{ "items": { "type": "integer" } - }, - "y": { - "type": "array", - "items": { - "type": "integer" - } } } }, @@ -1140,9 +1134,6 @@ const docTemplateV1 = `{ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" - }, - "y": { - "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1151,9 +1142,6 @@ const docTemplateV1 = `{ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" - }, - "y": { - "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1184,12 +1172,6 @@ const docTemplateV1 = `{ "items": { "type": "integer" } - }, - "a1": { - "type": "array", - "items": { - "type": "integer" - } } } }, diff --git a/disperser/dataapi/docs/v1/V1_swagger.json b/disperser/dataapi/docs/v1/V1_swagger.json index 052eeafeb..1fd39fb5b 100644 --- a/disperser/dataapi/docs/v1/V1_swagger.json +++ b/disperser/dataapi/docs/v1/V1_swagger.json @@ -1122,12 +1122,6 @@ "items": { "type": "integer" } - }, - "y": { - "type": "array", - "items": { - "type": "integer" - } } } }, @@ -1136,9 +1130,6 @@ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" - }, - "y": { - "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1147,9 +1138,6 @@ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" - }, - "y": { - "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1180,12 +1168,6 @@ "items": { "type": "integer" } - }, - "a1": { - "type": "array", - "items": { - "type": "integer" - } } } }, diff --git a/disperser/dataapi/docs/v1/V1_swagger.yaml b/disperser/dataapi/docs/v1/V1_swagger.yaml index ea9a47bd3..bc94a6c86 100644 --- a/disperser/dataapi/docs/v1/V1_swagger.yaml +++ b/disperser/dataapi/docs/v1/V1_swagger.yaml @@ -253,24 +253,16 @@ definitions: items: type: integer type: array - "y": - items: - type: integer - type: array type: object encoding.G2Commitment: properties: x: $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' - "y": - $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' type: object encoding.LengthProof: properties: x: $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' - "y": - $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' type: object github_com_Layr-Labs_eigenda_disperser.BlobStatus: enum: @@ -294,10 +286,6 @@ definitions: items: type: integer type: array - a1: - items: - type: integer - type: array type: object semver.SemverMetrics: properties: diff --git a/disperser/dataapi/docs/v2/V2_docs.go b/disperser/dataapi/docs/v2/V2_docs.go index 9df5d7464..e078e690c 100644 --- a/disperser/dataapi/docs/v2/V2_docs.go +++ b/disperser/dataapi/docs/v2/V2_docs.go @@ -445,7 +445,7 @@ const docTemplateV2 = `{ }, { "type": "string", - "description": "Operator ID in hex string", + "description": "Operator ID in hex string [default: all operators if unspecified]", "name": "operator_id", "in": "query" } @@ -454,7 +454,7 @@ const docTemplateV2 = `{ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/v2.OperatorDispersalResponse" + "$ref": "#/definitions/v2.OperatorDispersalResponses" } }, "400": { @@ -917,11 +917,14 @@ const docTemplateV2 = `{ } } }, - "v2.OperatorDispersalResponse": { + "v2.OperatorDispersalResponses": { "type": "object", "properties": { - "operator_dispersal_response": { - "$ref": "#/definitions/v2.DispersalResponse" + "operator_dispersal_responses": { + "type": "array", + "items": { + "$ref": "#/definitions/v2.DispersalResponse" + } } } }, diff --git a/disperser/dataapi/docs/v2/V2_swagger.json b/disperser/dataapi/docs/v2/V2_swagger.json index 3323d8512..2de5d3ab2 100644 --- a/disperser/dataapi/docs/v2/V2_swagger.json +++ b/disperser/dataapi/docs/v2/V2_swagger.json @@ -442,7 +442,7 @@ }, { "type": "string", - "description": "Operator ID in hex string", + "description": "Operator ID in hex string [default: all operators if unspecified]", "name": "operator_id", "in": "query" } @@ -451,7 +451,7 @@ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/v2.OperatorDispersalResponse" + "$ref": "#/definitions/v2.OperatorDispersalResponses" } }, "400": { @@ -914,11 +914,14 @@ } } }, - "v2.OperatorDispersalResponse": { + "v2.OperatorDispersalResponses": { "type": "object", "properties": { - "operator_dispersal_response": { - "$ref": "#/definitions/v2.DispersalResponse" + "operator_dispersal_responses": { + "type": "array", + "items": { + "$ref": "#/definitions/v2.DispersalResponse" + } } } }, diff --git a/disperser/dataapi/docs/v2/V2_swagger.yaml b/disperser/dataapi/docs/v2/V2_swagger.yaml index 8d0b91824..ffc1c004d 100644 --- a/disperser/dataapi/docs/v2/V2_swagger.yaml +++ b/disperser/dataapi/docs/v2/V2_swagger.yaml @@ -300,10 +300,12 @@ definitions: $ref: '#/definitions/big.Int' type: object type: object - v2.OperatorDispersalResponse: + v2.OperatorDispersalResponses: properties: - operator_dispersal_response: - $ref: '#/definitions/v2.DispersalResponse' + operator_dispersal_responses: + items: + $ref: '#/definitions/v2.DispersalResponse' + type: array type: object v2.OperatorPortCheckResponse: properties: @@ -559,7 +561,7 @@ paths: name: batch_header_hash required: true type: string - - description: Operator ID in hex string + - description: 'Operator ID in hex string [default: all operators if unspecified]' in: query name: operator_id type: string @@ -569,7 +571,7 @@ paths: "200": description: OK schema: - $ref: '#/definitions/v2.OperatorDispersalResponse' + $ref: '#/definitions/v2.OperatorDispersalResponses' "400": description: 'error: Bad request' schema: diff --git a/disperser/dataapi/v2/server_v2.go b/disperser/dataapi/v2/server_v2.go index 1b1258656..160174725 100644 --- a/disperser/dataapi/v2/server_v2.go +++ b/disperser/dataapi/v2/server_v2.go @@ -86,8 +86,9 @@ type ( StakeRankedOperators map[string][]*OperatorStake `json:"stake_ranked_operators"` } - OperatorDispersalResponse struct { - Response *corev2.DispersalResponse `json:"operator_dispersal_response"` + // Operators' responses for a batch + OperatorDispersalResponses struct { + Responses []*corev2.DispersalResponse `json:"operator_dispersal_responses"` } OperatorPortCheckResponse struct { @@ -194,7 +195,7 @@ func (s *ServerV2) Start() error { operators.GET("/stake", s.FetchOperatorsStake) operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo) operators.GET("/reachability", s.CheckOperatorsReachability) - operators.GET("/response/:batch_header_hash", s.FetchOperatorResponse) + operators.GET("/response/:batch_header_hash", s.FetchOperatorsResponses) } metrics := v2.Group("/metrics") { @@ -510,49 +511,62 @@ func (s *ServerV2) FetchOperatorsNodeInfo(c *gin.Context) { c.JSON(http.StatusOK, report) } -// FetchOperatorResponse godoc +// FetchOperatorsResponses godoc // // @Summary Fetch operator attestation response for a batch // @Tags Operators // @Produce json // @Param batch_header_hash path string true "Batch header hash in hex string" -// @Param operator_id query string false "Operator ID in hex string" -// @Success 200 {object} OperatorDispersalResponse +// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]" +// @Success 200 {object} OperatorDispersalResponses // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" // @Router /operators/{batch_header_hash} [get] -func (s *ServerV2) FetchOperatorResponse(c *gin.Context) { +func (s *ServerV2) FetchOperatorsResponses(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("FetchOperatorResponse", f*1000) // make milliseconds + s.metrics.ObserveLatency("FetchOperatorsResponses", f*1000) // make milliseconds })) defer timer.ObserveDuration() batchHeaderHashHex := c.Param("batch_header_hash") batchHeaderHash, err := dataapi.ConvertHexadecimalToBytes([]byte(batchHeaderHashHex)) if err != nil { - s.metrics.IncrementInvalidArgRequestNum("FetchOperatorResponse") + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorsResponses") errorResponse(c, errors.New("invalid batch header hash")) return } operatorIdStr := c.DefaultQuery("operator_id", "") - operatorId, err := core.OperatorIDFromHex(operatorIdStr) - if err != nil { - s.metrics.IncrementInvalidArgRequestNum("FetchOperatorResponse") - errorResponse(c, errors.New("invalid operatorId")) - return - } - operatorResponse, err := s.blobMetadataStore.GetDispersalResponse(c.Request.Context(), batchHeaderHash, operatorId) - if err != nil { - s.metrics.IncrementFailedRequestNum("FetchOperatorResponse") - errorResponse(c, err) - return + operatorResponses := make([]*corev2.DispersalResponse, 0) + if operatorIdStr == "" { + res, err := s.blobMetadataStore.GetDispersalResponses(c.Request.Context(), batchHeaderHash) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchOperatorsResponses") + errorResponse(c, err) + return + } + operatorResponses = append(operatorResponses, res...) + } else { + operatorId, err := core.OperatorIDFromHex(operatorIdStr) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorsResponses") + errorResponse(c, errors.New("invalid operatorId")) + return + } + + res, err := s.blobMetadataStore.GetDispersalResponse(c.Request.Context(), batchHeaderHash, operatorId) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchOperatorsResponses") + errorResponse(c, err) + return + } + operatorResponses = append(operatorResponses, res) } - response := &OperatorDispersalResponse{ - Response: operatorResponse, + response := &OperatorDispersalResponses{ + Responses: operatorResponses, } - s.metrics.IncrementSuccessfulRequestNum("FetchOperatorResponse") + s.metrics.IncrementSuccessfulRequestNum("FetchOperatorsResponses") c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorResponseAge)) c.JSON(http.StatusOK, response) } diff --git a/disperser/dataapi/v2/server_v2_test.go b/disperser/dataapi/v2/server_v2_test.go index 13d923dd0..1580c6c57 100644 --- a/disperser/dataapi/v2/server_v2_test.go +++ b/disperser/dataapi/v2/server_v2_test.go @@ -484,7 +484,7 @@ func TestCheckOperatorsReachability(t *testing.T) { mockSubgraphApi.Calls = nil } -func TestFetchOperatorResponse(t *testing.T) { +func TestFetchOperatorResponses(t *testing.T) { r := setUpRouter() ctx := context.Background() @@ -515,7 +515,26 @@ func TestFetchOperatorResponse(t *testing.T) { err = blobMetadataStore.PutDispersalResponse(ctx, dispersalResponse) assert.NoError(t, err) - r.GET("/v2/operators/response/:batch_header_hash", testDataApiServerV2.FetchOperatorResponse) + // Set up the other dispersal response in metadata store + operatorId2 := core.OperatorID{2, 3} + dispersalRequest2 := &corev2.DispersalRequest{ + OperatorID: operatorId2, + OperatorAddress: gethcommon.HexToAddress("0x1234567"), + Socket: "socket", + DispersedAt: uint64(time.Now().UnixNano()), + BatchHeader: *batchHeader, + } + dispersalResponse2 := &corev2.DispersalResponse{ + DispersalRequest: dispersalRequest2, + RespondedAt: uint64(time.Now().UnixNano()), + Signature: [32]byte{1, 1, 1}, + Error: "", + } + err = blobMetadataStore.PutDispersalResponse(ctx, dispersalResponse2) + assert.NoError(t, err) + + // Fetch response of a specific operator + r.GET("/v2/operators/response/:batch_header_hash", testDataApiServerV2.FetchOperatorsResponses) w := httptest.NewRecorder() reqStr := fmt.Sprintf("/v2/operators/response/%s?operator_id=%v", batchHeaderHash, operatorId.Hex()) req := httptest.NewRequest(http.MethodGet, reqStr, nil) @@ -526,11 +545,31 @@ func TestFetchOperatorResponse(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response serverv2.OperatorDispersalResponse + var response serverv2.OperatorDispersalResponses err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) - assert.Equal(t, response.Response, dispersalResponse) + assert.Equal(t, 1, len(response.Responses)) + assert.Equal(t, response.Responses[0], dispersalResponse) + + // Fetch all operators' responses for a batch + reqStr2 := fmt.Sprintf("/v2/operators/response/%s", batchHeaderHash) + w2 := httptest.NewRecorder() + req2 := httptest.NewRequest(http.MethodGet, reqStr2, nil) + r.ServeHTTP(w2, req2) + assert.Equal(t, w2.Code, http.StatusOK) + res2 := w2.Result() + defer res2.Body.Close() + data2, err := io.ReadAll(res2.Body) + assert.NoError(t, err) + + var response2 serverv2.OperatorDispersalResponses + err = json.Unmarshal(data2, &response2) + assert.NoError(t, err) + assert.NotNil(t, response2) + assert.Equal(t, 2, len(response2.Responses)) + assert.Equal(t, response2.Responses[0], dispersalResponse) + assert.Equal(t, response2.Responses[1], dispersalResponse2) } func TestFetchOperatorsStake(t *testing.T) {