diff --git a/internal/config/config.go b/internal/config/config.go index 3daae63f..0df81c45 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -118,7 +118,6 @@ type Config struct { QuickNodeEthereumRpcConfig EthereumRpcConfig PostgresConfig PostgresConfig EtherscanConfig EtherscanConfig - RabbitMqConfig RabbitMqConfig } type EthereumRpcConfig struct { @@ -138,14 +137,6 @@ type EtherscanConfig struct { ApiKeys []string } -type RabbitMqConfig struct { - Username string - Password string - Url string - Secure bool - PrefetchCount int -} - func NewConfig() *Config { return &Config{ Network: ParseNetwork(getPrefixedEnvVar("NETWORK")), @@ -174,14 +165,6 @@ func NewConfig() *Config { EtherscanConfig: EtherscanConfig{ ApiKeys: parseListEnvVar(getPrefixedEnvVar("ETHERSCAN_API_KEYS")), }, - - RabbitMqConfig: RabbitMqConfig{ - Username: getPrefixedEnvVar("RABBITMQ_USERNAME"), - Password: getPrefixedEnvVar("RABBITMQ_PASSWORD"), - Url: getPrefixedEnvVar("RABBITMQ_URL"), - Secure: parseBooleanEnvVar(getPrefixedEnvVar("RABBITMQ_SECURE")), - PrefetchCount: parseIntEnvVar(getPrefixedEnvVar("RABBITMQ_PREFETCH_COUNT"), 1), - }, } } diff --git a/internal/queue/rabbitmq/config.go b/internal/queue/rabbitmq/config.go deleted file mode 100644 index 60c4ecd7..00000000 --- a/internal/queue/rabbitmq/config.go +++ /dev/null @@ -1,109 +0,0 @@ -package rabbitmq - -import "github.com/Layr-Labs/sidecar/internal/clients/ethereum" - -const ( - Queue_blockIndexer = "block-indexer" - Queue_transactionParser = "transaction-parser" - Queue_contractIndexer = "contract-indexer" - Queue_restakeStrategiesAllBlocks = "restake-strategies-all-blocks" - Queue_restakeStrategies = "restake-strategies" - - RoutingKey_blockIndexer = "block-indexer" - RoutingKey_transactionParser = "transaction-parser" - RoutingKey_contractIndexer = "contract-indexer" - RoutingKey_restakeStrategies = "restake-strategies" - - Exchange_blocklake = "blocklake" - Exchange_blocks = "blocks" -) - -func GetQueuesAndExchanges() ([]*RabbitMQQueue, []*RabbitMQExchange) { - queues := []*RabbitMQQueue{ - { - Name: Queue_blockIndexer, - Durable: true, - AutoAck: false, - Exclusive: false, - BindExchange: Exchange_blocklake, - BindRoutingKey: RoutingKey_blockIndexer, - }, { - Name: Queue_transactionParser, - Durable: true, - AutoAck: false, - Exclusive: false, - BindExchange: Exchange_blocklake, - BindRoutingKey: RoutingKey_transactionParser, - }, { - Name: Queue_contractIndexer, - Durable: true, - AutoAck: false, - Exclusive: false, - BindExchange: Exchange_blocklake, - BindRoutingKey: RoutingKey_contractIndexer, - }, - // We bind Queue_restakeStrategiesAllBlocks to two different exchanges: - // - blocklake is used for directly queueing to it - // - blocks is a fanout exchange for anything that wants to listen to all blocks produced. - { - Name: Queue_restakeStrategies, - Durable: true, - AutoAck: false, - Exclusive: false, - BindExchange: Exchange_blocklake, - BindRoutingKey: RoutingKey_restakeStrategies, - }, { - Name: Queue_restakeStrategiesAllBlocks, - Durable: true, - AutoAck: false, - Exclusive: false, - BindExchange: Exchange_blocks, - BindRoutingKey: RoutingKey_restakeStrategies, - }, - } - - exchanges := []*RabbitMQExchange{ - { - Name: "blocklake", - Durable: true, - AutoDelete: false, - Kind: "topic", - }, { - Name: "blocks", - Durable: true, - AutoDelete: false, - Kind: "fanout", - }, - } - - return queues, exchanges -} - -type BlockIndexerMessage struct { - BlockNumber uint64 - Reindex bool -} - -type TransactionParserMessage struct { - BlockNumber uint64 - BlockSequenceId uint64 - Transactions []*ethereum.EthereumTransaction - Receipts map[string]*ethereum.EthereumTransactionReceipt - Reprocess bool -} - -type ContractIndexerParserMessage struct { - BlockNumber uint64 -} - -type ReIndexTransactionMessage struct { - TransactionHash string -} - -type BlockProcessedMessage struct { - BlockNumber uint64 -} - -type ReIndexRestakedStrategies struct { - BlockNumber uint64 -} diff --git a/internal/queue/rabbitmq/rabbitmq.go b/internal/queue/rabbitmq/rabbitmq.go deleted file mode 100644 index b2291a0d..00000000 --- a/internal/queue/rabbitmq/rabbitmq.go +++ /dev/null @@ -1,121 +0,0 @@ -package rabbitmq - -import ( - "fmt" - amqp "github.com/rabbitmq/amqp091-go" - "go.uber.org/zap" -) - -type RabbitMQQueue struct { - Name string - Durable bool - AutoAck bool - Exclusive bool - BindExchange string - BindRoutingKey string -} - -type RabbitMQExchange struct { - Name string - Durable bool - AutoDelete bool - Kind string -} - -type RabbitMQConfig struct { - Username string - Password string - Url string - Secure bool - Queues []*RabbitMQQueue - Exchanges []*RabbitMQExchange -} - -type RabbitMQ struct { - logger *zap.Logger - config *RabbitMQConfig - connection *amqp.Connection - channel *amqp.Channel -} - -func NewRabbitMQ(config *RabbitMQConfig, l *zap.Logger) *RabbitMQ { - return &RabbitMQ{ - config: config, - logger: l, - } -} - -func (r *RabbitMQ) Connect() (*amqp.Connection, error) { - connUrl := buildConnectionUrl(r.config) - r.logger.Sugar().Debug(fmt.Sprintf("Connecting to RabbitMQ at %s", connUrl)) - conn, err := amqp.Dial(connUrl) - if err != nil { - r.logger.Sugar().Errorw(fmt.Sprintf("Failed to connect to RabbitMQ: %v", err)) - return nil, err - } - r.connection = conn - - ch, err := conn.Channel() - if err != nil { - r.logger.Sugar().Errorw(fmt.Sprintf("Failed to open a channel: %v", err)) - return nil, err - } - r.channel = ch - - for _, e := range r.config.Exchanges { - r.logger.Sugar().Debug(fmt.Sprintf("Declaring exchange %s", e.Name)) - err = r.channel.ExchangeDeclare(e.Name, e.Kind, e.Durable, e.AutoDelete, false, false, nil) - if err != nil { - return nil, err - } - } - - for _, q := range r.config.Queues { - r.logger.Sugar().Debug(fmt.Sprintf("Declaring queue %s", q.Name)) - _, err = r.channel.QueueDeclare(q.Name, q.Durable, false, false, false, nil) - if err != nil { - return nil, err - } - - if q.BindExchange != "" { - r.logger.Sugar().Debug(fmt.Sprintf("Binding queue %s to exchange %s with routing key %s", q.Name, q.BindExchange, q.BindRoutingKey)) - err = r.channel.QueueBind(q.Name, q.BindRoutingKey, q.BindExchange, false, nil) - if err != nil { - return nil, err - } - } - - } - - return conn, nil -} - -func (r *RabbitMQ) SetQos(prefetchCount int) error { - return r.channel.Qos(prefetchCount, 0, false) -} - -func (r *RabbitMQ) Publish(exchangeName string, routingKey string, publishing amqp.Publishing) error { - return r.channel.Publish(exchangeName, routingKey, false, false, publishing) -} - -func (r *RabbitMQ) Consume(queueName, consumerName string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { - return r.channel.Consume(queueName, consumerName, autoAck, exclusive, noLocal, noWait, args) -} - -func (r *RabbitMQ) PurgeAllQueues() { - for _, q := range r.config.Queues { - r.PurgeQueue(q.Name, false) - } -} - -func (r *RabbitMQ) PurgeQueue(queueName string, noWait bool) (int, error) { - return r.channel.QueuePurge(queueName, noWait) -} - -func buildConnectionUrl(cfg *RabbitMQConfig) string { - protocol := "amqp" - if cfg.Secure { - protocol = "amqps" - } - return fmt.Sprintf("%s://%s:%s@%s", protocol, cfg.Username, cfg.Password, cfg.Url) -} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 0e83740e..40bd86a2 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -1,7 +1,6 @@ package storage import ( - "github.com/Layr-Labs/sidecar/internal/clients/ethereum" "github.com/Layr-Labs/sidecar/internal/parser" "time" ) @@ -82,10 +81,6 @@ type OperatorRestakedStrategies struct { } // Not tables -type BatchInsertTransactionLogs struct { - Transaction *ethereum.EthereumTransaction - ParsedTransaction *parser.ParsedTransaction -} type ActiveAvsOperator struct { Avs string diff --git a/protos/eigenlayer/blocklake/v1/api.proto b/protos/eigenlayer/blocklake/v1/api.proto index cdbd7de3..1b784b58 100644 --- a/protos/eigenlayer/blocklake/v1/api.proto +++ b/protos/eigenlayer/blocklake/v1/api.proto @@ -47,9 +47,6 @@ message BackfillRequest { message BackfillResponse { } -message PurgeQueuesRequest{} -message PurgeQueuesResponse{} - message IndexContractsRequest { BackfillRange range = 1; } @@ -76,11 +73,6 @@ service Backfiller { body: "*" }; } - rpc PurgeQueues(PurgeQueuesRequest) returns (PurgeQueuesResponse) { - option (google.api.http) = { - post: "/v1/purge-queues" - }; - } rpc IndexContracts(IndexContractsRequest) returns (IndexContractsResponse) { option (google.api.http) = { post: "/v1/index-contracts" diff --git a/scripts/deployWriterPostgresProxy.sh b/scripts/deployWriterPostgresProxy.sh deleted file mode 100755 index 78a7a673..00000000 --- a/scripts/deployWriterPostgresProxy.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env bash - -kubectl run postgres-proxy-write \ - --image docker.io/alpine/socat \ - --namespace blocklake-dev \ - -- \ - tcp-listen:5432,fork,reuseaddr \ - tcp-connect:blocklake.cluster-cjg0ui0ksnx8.us-east-1.rds.amazonaws.com:5432 diff --git a/scripts/helperPods/deployPostgresProxy.sh b/scripts/helperPods/deployPostgresProxy.sh deleted file mode 100755 index 6cf1834e..00000000 --- a/scripts/helperPods/deployPostgresProxy.sh +++ /dev/null @@ -1,6 +0,0 @@ -kubectl run postgres-proxy-write \ - --image docker.io/alpine/socat \ - --namespace blocklake-dev \ - -- \ - tcp-listen:5432,fork,reuseaddr \ - tcp-connect:blocklake.cluster-cjg0ui0ksnx8.us-east-1.rds.amazonaws.com:5432 diff --git a/scripts/helperPods/deployPostgresProxyRo.sh b/scripts/helperPods/deployPostgresProxyRo.sh deleted file mode 100755 index 665d48cc..00000000 --- a/scripts/helperPods/deployPostgresProxyRo.sh +++ /dev/null @@ -1,6 +0,0 @@ -kubectl run postgres-proxy \ - --image docker.io/alpine/socat \ - --namespace blocklake-dev \ - -- \ - tcp-listen:5432,fork,reuseaddr \ - tcp-connect:blocklake.cluster-ro-cjg0ui0ksnx8.us-east-1.rds.amazonaws.com:5432