Skip to content

Commit

Permalink
Send performance notification (#88)
Browse files Browse the repository at this point in the history
* Send performance notification

* add min genesis time for each network

* Do not send performance notification if report is older than 24h (#89)

* performance notification 24h check

* improve performance notification

* 1 day

* simplify logs

---------

Co-authored-by: Marc Font <[email protected]>
  • Loading branch information
pablomendezroyo and Marketen authored Dec 13, 2024
1 parent 7082c69 commit f6cccf2
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func main() {
distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment)
validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment)
validatorEjectorService := services.NewValidatorEjectorService(storageAdapter, notifierAdapter, exitValidatorAdapter, beaconchainAdapter)
pendingHashesLoaderService := services.NewPendingHashesLoader(storageAdapter, ipfsAdapter)
pendingHashesLoaderService := services.NewPendingHashesLoader(storageAdapter, notifierAdapter, ipfsAdapter, networkConfig.MinGenesisTime)
// relaysCheckerService := services.NewRelayCronService(relaysAllowedAdapter, relaysUsedAdapter, notifierAdapter)

// Start domain services
Expand Down
134 changes: 116 additions & 18 deletions internal/application/services/loadPendingHashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ package services

import (
"context"
"fmt"
"lido-events/internal/application/domain"
"lido-events/internal/application/ports"
"lido-events/internal/logger"
"math/big"
"strings"
"sync"
"time"
)

type PendingHashesLoader struct {
storagePort ports.StoragePort
ipfsPort ports.IpfsPort
servicePrefix string
storagePort ports.StoragePort
notifierPort ports.NotifierPort
ipfsPort ports.IpfsPort
minGenesisTime uint64
servicePrefix string
}

func NewPendingHashesLoader(storagePort ports.StoragePort, ipfsPort ports.IpfsPort) *PendingHashesLoader {
func NewPendingHashesLoader(storagePort ports.StoragePort, notifierPort ports.NotifierPort, ipfsPort ports.IpfsPort, minGenesisTime uint64) *PendingHashesLoader {
return &PendingHashesLoader{
storagePort,
notifierPort,
ipfsPort,
minGenesisTime,
"PendingHashesLoader",
}
}
Expand Down Expand Up @@ -55,33 +62,30 @@ func (phl *PendingHashesLoader) LoadPendingHashesCron(ctx context.Context, inter
}
}

// loadPendingHashes loads all pending hashes from the storage and fetches the corresponding IPFS data
func (phl *PendingHashesLoader) LoadPendingHashes() error {
// Get operator IDs
operatorIDs, err := phl.storagePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix(phl.servicePrefix, "Failed to get operator IDs", err)
return err
}
// Skip if there are no operator IDs
if len(operatorIDs) == 0 {
logger.InfoWithPrefix(phl.servicePrefix, "No operator IDs found; skipping loading pending hashes")
return nil
}

// Get all pending hashes from the storage
// Get all pending hashes
pendingHashes, err := phl.storagePort.GetPendingHashes()
if err != nil {
logger.ErrorWithPrefix(phl.servicePrefix, "Failed to get pending hashes", err)
return err
}
// Skip if there are no pending hashes
if len(pendingHashes) == 0 {
logger.InfoWithPrefix(phl.servicePrefix, "No pending hashes found; skipping loading pending hashes")
return nil
}

// Fetch and parse IPFS data for each pending hash
// Process each pending hash
for _, pendingHash := range pendingHashes {
logger.DebugWithPrefix(phl.servicePrefix, "Fetching and parsing IPFS data for pending hash %s", pendingHash)

Expand All @@ -91,37 +95,131 @@ func (phl *PendingHashesLoader) LoadPendingHashes() error {
continue
}

// Process each operator ID in the original report
for _, operatorID := range operatorIDs {
logger.DebugWithPrefix(phl.servicePrefix, "Saving report data for operator ID %s", operatorID.String())
logger.DebugWithPrefix(phl.servicePrefix, "Processing data for operator ID %s", operatorID.String())

// Get the data for the operator ID
data, exists := originalReport.Operators[operatorID.String()]
if !exists {
// Skip if the operator ID is not found in the report
logger.WarnWithPrefix(phl.servicePrefix, "Operator ID %s not found in the original report, skipping", operatorID.String())
continue
}

// Check performance and send notifications if needed
// Only sends the notification if the report is from the last 24 hours. Calculated from genesis timestamp & report epoch
phl.CheckAndNotifyPerformance(operatorID, data.Validators, originalReport)

// Save the report
report := domain.Report{
Frame: originalReport.Frame,
Data: data,
Threshold: originalReport.Threshold,
}

// Save the report data for the operator ID
if err := phl.storagePort.SaveReport(operatorID, report); err != nil {
logger.ErrorWithPrefix(phl.servicePrefix, "Failed to save report data for operator ID %s: %v", operatorID.String(), err)
logger.ErrorWithPrefix(phl.servicePrefix, "Failed to save report for operator ID %s: %v", operatorID.String(), err)
continue
}
}

// Remove the pending hash from storage
// Remove the pending hash
if err := phl.storagePort.DeletePendingHash(pendingHash); err != nil {
logger.ErrorWithPrefix(phl.servicePrefix, "Failed to delete pending hash %s: %v", pendingHash, err)
continue
}
}

return nil
}

func (phl *PendingHashesLoader) CheckAndNotifyPerformance(operatorID *big.Int, validators map[string]domain.Validator, originalReport domain.OriginalReport) {
// Initialize slices to collect messages for bad and good-performing validators
var badValidators []string
var goodValidators []string

// Track if there are any bad-performing validators
anyBadPerformance := false
anyGoodPerformance := false

// Iterate through the validators and check performance
for validatorID, validator := range validators {
if validator.Perf.Assigned > 60 { // Only try to notify if the validator has had duties to at least 60 epochs
performance := float64(validator.Perf.Included) / float64(validator.Perf.Assigned) * 100
if performance < originalReport.Threshold {
// Mark that we have at least one bad-performing validator
anyBadPerformance = true

// Collect bad-performing validator details
badValidators = append(badValidators, fmt.Sprintf("| %s | %.2f%% |", validatorID, performance))
} else {
// Collect good-performing validator details
anyGoodPerformance = true
goodValidators = append(goodValidators, fmt.Sprintf("| %s | %.2f%% |", validatorID, performance))
}
}
}

// Calculate the report timestamp from the frame data
reportTimestamp := phl.minGenesisTime + uint64(originalReport.Frame[1])*384

// Get the current time
currentTime := uint64(time.Now().Unix())

// Calculate the time difference in seconds
timeDiff := currentTime - reportTimestamp

// Convert the time difference to days and hours
days := timeDiff / 86400 // 86400 seconds in a day
hours := (timeDiff % 86400) / 3600 // 3600 seconds in an hour

// Format the time ago string
timeAgo := fmt.Sprintf("%d days and %d hours ago", days, hours)

// Check if the report is older than 24h (86400 seconds)
// Only skip if it's older than 24h
if timeDiff > 86400 {
logger.DebugWithPrefix(phl.servicePrefix, "Skipping notification for operator ID %s, report (epoch %d) is older than 24h", operatorID.String(), originalReport.Frame[1])
return
}

// If at least one validator had bad performance, send a bad performance notification
if anyBadPerformance {
// Create the bad performance notification message
message := fmt.Sprintf(
"- 🚨 Operator ID: %s, from epoch %d to epoch %d (%s), the following validators have had bad performance below the threshold of %.2f%%:\n",
operatorID.String(), originalReport.Frame[0], originalReport.Frame[1], timeAgo, originalReport.Threshold*100, // Multiply by 100 to convert to percentage
)
// Add table header
message += "| Validator ID | Performance (%) |\n"
message += "|--------------|-----------------|\n"
// Add bad validators' performance
message += strings.Join(badValidators, "\n") + "\n"

// Log the notification message
logger.InfoWithPrefix(phl.servicePrefix, "Sending bad performance notification for report epoch %d", originalReport.Frame[1])

// Send the notification
if err := phl.notifierPort.SendNotification(message); err != nil {
logger.ErrorWithPrefix(phl.servicePrefix, "Failed to send bad performance notification: %v", err)
}
}

// If at least one validator had good performance, send a good performance notification
if anyGoodPerformance {
// Create the good performance notification message
message := fmt.Sprintf(
"- ✅ Operator ID: %s, from epoch %d to epoch %d (%s), the following validators performed well (above the threshold of %.2f%%):\n",
operatorID.String(), originalReport.Frame[0], originalReport.Frame[1], timeAgo, originalReport.Threshold*100, // Multiply by 100 to convert to percentage
)
// Add table header
message += "| Validator ID | Performance (%) |\n"
message += "|--------------|-----------------|\n"
// Add good validators' performance
message += strings.Join(goodValidators, "\n") + "\n"

// Log the notification message
logger.InfoWithPrefix(phl.servicePrefix, "Sending good performance notification for report epoch %d", originalReport.Frame[1])

// Send the notification
if err := phl.notifierPort.SendNotification(message); err != nil {
logger.ErrorWithPrefix(phl.servicePrefix, "Failed to send good performance notification: %v", err)
}
}
}
5 changes: 5 additions & 0 deletions internal/config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Config struct {
// Lido specifics
LidoKeysApiUrl string
ProxyApiPort uint64

// Blockchain
MinGenesisTime uint64
}

// Helper function to parse and validate CORS from environment variable
Expand Down Expand Up @@ -160,6 +163,7 @@ func LoadNetworkConfig() (Config, error) {
CSModuleAddress: common.HexToAddress("0x4562c3e63c2e586cD1651B958C22F88135aCAd4f"),
LidoKeysApiUrl: "https://keys-api-holesky.testnet.fi",
ProxyApiPort: proxyApiPort,
MinGenesisTime: uint64(1695902400),
}
case "mainnet":
// Configure default values for the mainnet
Expand Down Expand Up @@ -196,6 +200,7 @@ func LoadNetworkConfig() (Config, error) {
CSModuleAddress: common.HexToAddress("0xdA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"),
LidoKeysApiUrl: "https://keys-api.lido.fi",
ProxyApiPort: proxyApiPort,
MinGenesisTime: uint64(1606824023),
}
default:
logger.Fatal("Unknown network: %s", network)
Expand Down

0 comments on commit f6cccf2

Please sign in to comment.