Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop refactor #9

Merged
merged 10 commits into from
Dec 27, 2024
Merged

Develop refactor #9

merged 10 commits into from
Dec 27, 2024

Conversation

leonz789
Copy link
Collaborator

@leonz789 leonz789 commented Dec 25, 2024

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced price feeder system with improved initialization and error handling.
    • Introduced new event subscription logic for better management of WebSocket connections.
    • Added functionality for handling token sources and configuration management.
    • New methods for managing Ethereum clients and aggregators in the chainlink package.
    • Introduced a new gRPC client for interacting with the Exocore network.
  • Bug Fixes

    • Improved error handling across various components, providing clearer context in error messages.
  • Refactor

    • Streamlined the structure of the fetcher and exoclient packages for better maintainability and clarity.
    • Reorganized interfaces and structs to enhance functionality and reduce redundancy.
    • Updated the management of staker validators for improved clarity and efficiency.
  • Tests

    • Added new test cases to validate configuration management.

Copy link

coderabbitai bot commented Dec 25, 2024

Warning

Rate limit exceeded

@leonz789 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 11 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 9052596 and 0f42724.

📒 Files selected for processing (4)
  • exoclient/tx.go (2 hunks)
  • exoclient/types.go (2 hunks)
  • fetcher/chainlink/chainlink.go (2 hunks)
  • types/types.go (3 hunks)

Walkthrough

The pull request introduces comprehensive changes across multiple packages, focusing on enhancing the price feeder system's architecture, error handling, and functionality. Key modifications include refactoring the client and fetcher components, improving WebSocket and gRPC connection management, and implementing more robust initialization and event handling mechanisms. The changes streamline the code structure, introduce better concurrency management, and provide more detailed error reporting across various modules like exoclient, fetcher, and cmd.

Changes

File Change Summary
.gitignore Updated log file ignore pattern from node*.log to *.log
cmd/feeder_tool.go Added retry logic, new initialization functions, enhanced error handling for oracle parameters and feeder updates
cmd/start.go Added commented-out PreRun function
cmd/types.go Introduced new interfaces, structs for price fetching, submitting, and managing feeders
exoclient/* Significant refactoring of gRPC and WebSocket client, added new methods for connection management, event handling
fetcher/beaconchain/* Updated staker validator management, improved error handling and logging
fetcher/chainlink/* Restructured price fetching logic, enhanced client and token management
fetcher/fetcher.go Redesigned fetcher initialization, added methods for source and token management
types/* Added new configuration structures, error variables, and interfaces

Sequence Diagram

sequenceDiagram
    participant Client as ExoClient
    participant Fetcher as PriceFetcher
    participant Oracle as OracleService
    
    Client->>Fetcher: Initialize Sources
    Fetcher-->>Client: Sources Initialized
    
    loop Price Update Cycle
        Client->>Fetcher: Request Latest Price
        Fetcher->>Oracle: Fetch Price Data
        Oracle-->>Fetcher: Return Price
        Fetcher-->>Client: Provide Price
        
        Client->>Oracle: Submit Price Transaction
        Oracle-->>Client: Transaction Response
    end
Loading

Poem

🐰 Hopping through code with glee,
Refactoring lines, setting modules free
Retry logic tight, errors now clear
WebSocket dancing, no more fear
Price feeders leap, a rabbit's delight! 🚀


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (39)
fetcher/chainlink/chainlink.go (1)

71-89: Potential grammar fix and enhancement.

  1. The error message at line 88 says "token not found in reloaded config filed". Suggest changing “filed” to “file.”
  2. reload method design looks fine, but you might want to log if partial updates are successful for networks before the token is reported as missing.
- return errors.New("token not found in reloaded config filed")
+ return errors.New("token not found in reloaded config file")
exoclient/tx.go (2)

51-54: Consider using %#x for byte slices.
The format specifier %b prints the binary representation. Using %x or %X often makes the output easier to interpret for debugging.

- return nil, fmt.Errorf("failed to encode singedTx, txBytes:%b, msg:%v, ...
+ return nil, fmt.Errorf("failed to encode signedTx, txBytes:%x, msg:%v, ...

57-69: Minor spelling error in broadcast error message.
"braodcast" → "broadcast" in line 66.

- return nil, fmt.Errorf("failed to braodcast transaction, ...
+ return nil, fmt.Errorf("failed to broadcast transaction, ...
exoclient/client.go (1)

23-54: Struct fields are well organized.
Storing the logger, WebSocket, and gRPC references in one place clarifies ownership. Consider continuing to tweak concurrency usage (e.g., using atomic counters) if concurrency intensifies.

fetcher/chainlink/types.go (1)

63-77: addClient omits locking by design.
If concurrency is possible, consider clarifying that concurrency is managed externally, or add a brief comment. Otherwise, the approach is consistent with the stated docstring.

cmd/feeder_tool.go (1)

158-197: initComponents partially depends on external variables like sourcesPath and mnemonic.
This approach can work but slightly reduces modular flexibility. If possible, consider passing them explicitly to the function to avoid hidden dependencies.

fetcher/fetcher.go (6)

21-25: Global logger & defaultFetcher usage
Consider documenting why defaultFetcher is a package-level global. Global singletons can create hidden dependencies and complicate testing. If multiple fetchers become necessary later, refactoring could be cumbersome.


38-51: Request structs
These small request structs are straightforward. Consider adding brief docstrings to clarify usage.


59-68: NewFetcher constructor
The buffer size of 5 for channels is somewhat arbitrary. If usage grows, consider using a dynamic or differently sized buffer. Otherwise, this design is okay.


84-91: AddTokenForSourceUnBlocked
This variant is asynchronous, but the function is labeled with a TODO. Consider adding docstrings to clarify the differences with the blocking version and confirm the intended usage.


94-105: InitNSTStakerValidators
Directly calling beaconchain.ResetStakerValidators from here couples the fetcher to the beaconchain package. A more decoupled design might expose an interface.


139-183: Goroutine for request handling
Each select case is well-structured. The code is readable, though logs might be improved (e.g., unify error logs format). Also ensure that channel operations don’t block if the fetcher is stopped midway.

fetcher/beaconchain/types.go (4)

24-27: Source struct
The embedded *types.Source is convenient, but keep in mind that collisions in field or method names can happen if the embedded type grows.


51-60: length method
Short utility method. The nil check is safe-guarding, but normally you might not expect a nil stakerVList.


156-157: logger & defaultSource
Same remarks as with other packages: keep an eye on global variables for testability.


Line range hint 164-229: initBeaconchain
Good expanded error messages with fmt.Sprintf. The function sets up the default source, config file parsing, etc. The logic is fairly linear. Consider logging or returning partial progress if an error occurs mid-initialization.

fetcher/beaconchain/beaconchain.go (2)

71-72: Default staker validators
defaultStakerValidators = newStakerVList() is used globally. Using a singleton pattern can hamper unit testing or multi-chain scenarios.


Line range hint 94-160: fetch
The logic of checking finalization, computing rema for stakers, collecting partial requests if >100 validators, etc., is well structured. Consider additional logging for large sets, timeouts, or partial successes.

exoclient/subscribe.go (5)

47-76: Subscribe
Launching a goroutine to handle stop signals and re-establish tasks is a valid approach. Carefully review the ability to stack multiple restarts if wsStop is triggered repeatedly.


146-155: increaseWsRountines
Checks ec.wsActive before incrementing. The naming “Rountines” has a small typo; consider “Routines.”


189-216: sendAllSubscribeMsgs
Resubscribes to each event up to maxRetry times. If any are unsent, the function errors out. Good approach; watch out for partial subscription states.


219-253: startPingRoutine
Sends a ping every 10s. On failure, stops routines. This approach is typical. Potentially consider more robust reconnection logic.


256-319: startReadRoutine
Reads from the websocket, processes events, or panics on unexpected closure. The panic is quite forceful if we can handle a reconnect gracefully.

fetcher/types/types.go (6)

14-17: Consider adding usage documentation for your interface methods.

The SourceInf interface has multiple methods related to lifecycle management (like InitTokens, Start, Stop) and dynamic token management (AddTokenAndStart). Adding a short usage comment for each method will make it easier for future contributors to understand the expected sequence of calls and any invariants (e.g., only call InitTokens before Start).


25-28: Implement all planned lifecycle methods.

You have placeholders for advanced lifecycle management (e.g., partial token stop or config reload). If these methods are not planned in the immediate scope, remove the commented-out lines. Otherwise, make them part of the interface to communicate clear functionality and keep them discoverable for future extension.


125-132: Optimize the update check in PriceSync.Update().

You compare only price and decimal in EqualPrice, but the RoundID might also matter in some contexts. If the desired logic is to skip updates for the same price decimals ignoring round ID, that's fine. Otherwise, consider clarifying or expanding the check to incorporate the round ID.


316-340: Improve clarity on unhandled tokens in AddTokenAndStart().

If the channel is full, you return an error indicating too many pending tokens. That’s valid, but sometimes it might be beneficial to block or use a more advanced queueing strategy. If blocking is undesirable, consider logging a warning or implementing a metric to track how often tokens are being dropped.


357-392: Consider maximum failure threshold for startFetchToken().

The TODO comment (line 378-379) suggests an idea to exit after maximum failures. Otherwise, tokens might keep retrying indefinitely. This could be detrimental if the source is truly misconfigured. Consider implementing an exponential backoff or a maximum failure count to avoid excessive resource usage.


405-416: Promote Status() for improved observability.

Currently, Status() returns whether each token is “active,” but no error or last fetch result is exposed except for the PriceInfo. Consider including the last fetch error or a timestamp of the most recent successful fetch to simplify monitoring and debugging.

cmd/types.go (2)

154-218: Recommend graceful termination for the goroutine in start().

The loop blocks indefinitely on select (line 157). If you need to stop a feeder, consider introducing a closure channel or context to signal the goroutine. Otherwise, these routines remain forever, potentially blocking shutdown or resource cleanup.


342-355: Revisit any newly added feeders after param updates.

In Feeders.Start() (lines 343–390), you handle param updates and trigger updates for all feeders. If a new feeder is added after you have triggered that param update, consider whether the new feeder also needs the updated params or a subsequent call to UpdateOracleParams.

exoclient/query_test.go (1)

10-10: Validate ignoring the additional return values.

You are discarding the extra returns from CreateGrpcConn. Ensure they are indeed optional. If they contain references needed for resource cleanup or instrumentation, consider capturing them. Otherwise, discarding them is acceptable if no further usage is required.

exoclient/grpc.go (1)

16-33: Add context cancellation handling for createGrpcConn.

Currently, if the context times out, you handle the error properly (line 33). However, if the dial attempt returns quickly for other reasons, consider whether you need to clean up derived contexts or exit early. Also, confirm that the keepalive settings are suitable for your environment to avoid accidental disconnects.

exoclient/query.go (3)

11-16: Appropriate error wrapping; consider context keys
This method uses good error wrapping to include context: "failed to query oracle params from oracleClient". If context.WithValue or additional metadata is needed, consider injecting it here to yield more detailed debugging info.


32-35: Verify whether a paginated alternative is needed
The current method fetches all StakerInfos in one request. If the data set is large, consider adding pagination or streaming to handle scalability.

Would you like a suggested plan for introducing pagination?


41-44: Consolidate repetitive code
GetStakerInfo re-uses StakerInfos(context.Context, &...) with the same request struct as GetStakerInfos. If the calls are identical except for filtering logic in the caller, consider DRYing these out or documenting the difference.

types/types.go (3)

13-27: Revisit the commented-out interface
The commented-out interface PriceInfoInf could be central to code clarity if implemented. Consider either fully removing it if it's no longer needed or implementing it if it adds valuable abstraction.


35-39: Ensure flexible configuration for sources
TokenSources uses a single comma-separated string for Sources. This may reduce future extensibility if multiple data attributes per source are needed. Consider using an array or a struct for each source.


40-52: Expand or nest config for better readability
The new Config struct organizes Tokens and Sender details clearly. If the codebase grows, consider nesting or splitting configurations (e.g., separate file or struct handling) to enhance maintainability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 72b02c5 and b11b1dc.

📒 Files selected for processing (20)
  • .gitignore (1 hunks)
  • cmd/feeder_tool.go (1 hunks)
  • cmd/start.go (1 hunks)
  • cmd/types.go (1 hunks)
  • exoclient/client.go (1 hunks)
  • exoclient/grpc.go (1 hunks)
  • exoclient/query.go (1 hunks)
  • exoclient/query_test.go (1 hunks)
  • exoclient/subscribe.go (1 hunks)
  • exoclient/tx.go (2 hunks)
  • exoclient/types.go (2 hunks)
  • fetcher/beaconchain/beaconchain.go (8 hunks)
  • fetcher/beaconchain/types.go (4 hunks)
  • fetcher/chainlink/chainlink.go (2 hunks)
  • fetcher/chainlink/types.go (2 hunks)
  • fetcher/fetcher.go (1 hunks)
  • fetcher/interface.go (0 hunks)
  • fetcher/types/types.go (1 hunks)
  • types/config_test.go (1 hunks)
  • types/types.go (3 hunks)
💤 Files with no reviewable changes (1)
  • fetcher/interface.go
✅ Files skipped from review due to trivial changes (2)
  • cmd/start.go
  • .gitignore
🧰 Additional context used
🪛 golangci-lint (1.62.2)
exoclient/client.go

1-1: : # github.com/ExocoreNetwork/price-feeder/exoclient [github.com/ExocoreNetwork/price-feeder/exoclient.test]
exoclient/query_test.go:10:14: undefined: CreateGrpcConn
exoclient/query_test.go:12:12: undefined: GetParams

(typecheck)

🔇 Additional comments (80)
fetcher/chainlink/chainlink.go (5)

11-12: Imports look good.
The addition of fetchertypes and feedertypes seems consistent with the new separation of concerns.


19-20: Refactored function signature looks solid.
Switching from a standalone function to a method on source is an improvement in encapsulation, allowing the function to access s.chainlinkProxy directly.


27-27: Error propagation is clear.
Returning a wrapped error when LatestRoundData fails ensures debugging context.


32-32: Consistent error message.
The concise error string matches the pattern used elsewhere, enhancing uniformity.


35-37: Return struct changes are correct.
Casting decimals to int32 is standard practice for on-chain decimal fields. Ensure all consumers accept the new type.

exoclient/tx.go (7)

8-9: Imports for the new oracle package and fetcher types are consistent.
No issues found here.


19-21: Updated function signature improves error handling.
Returning (*sdktx.BroadcastTxResponse, error) is more idiomatic than returning a single object with embedded error states.


23-34: Construction of MsgCreatePrice is organized.
Embedding the entire price fetchertypes.PriceInfo reduces parameter sprawl. Ensure that Chainlink is defined as intended.


45-48: Signing logic with signMsg.
This approach delegates granular signing logic to a dedicated function, improving readability and maintainability.


71-95: Signature placeholder usage.
Using an initial empty signature and then setting the actual signature is acceptable in Cosmos SDK patterns. Just verify that you handle edge cases (e.g., hardware signer or ledger) if relevant.


97-111: Method getSignBytes is well-structured.
Clearly separates retrieving sign bytes from the rest of the flow. Makes debugging signing issues easier.


113-122: getSignature method is simple and clean.
The SingleSignatureData usage is straightforward. Ensure all sign modes are supported long-term if expanding signers.

exoclient/client.go (9)

1-2: Package declaration looks good.
No functional issues noted.

🧰 Tools
🪛 golangci-lint (1.62.2)

1-1: : # github.com/ExocoreNetwork/price-feeder/exoclient [github.com/ExocoreNetwork/price-feeder/exoclient.test]
exoclient/query_test.go:10:14: undefined: CreateGrpcConn
exoclient/query_test.go:12:12: undefined: GetParams

(typecheck)


3-19: Imports match the intended usage.
Includes relevant gRPC, HTTP, WebSocket, and Cosmos SDK types.


21-21: Interface compliance check.
The var _ ExoClientInf = &exoClient{} pattern is a good way to ensure compile-time interface conformance.


56-70: NewExoClient invests in robust connection setup.

  1. Logging each stage fosters better diagnostics.
  2. Confirm that large numbers of short-lived connections won't cause resource exhaustion.

72-105: WebSocket-specific logic is thorough.
Defining custom NetDial and SetPongHandler is valuable for more controlled environment.


106-109: Close orchestrates resource cleanup well.
Calling CloseWs() before CloseGRPC() is logically sound to ensure remote side experiences immediate termination.


111-115: CloseGRPC is straightforward.
No concurrency concerns visible here, so the direct close call seems sufficient.


116-122: CloseWs with a safeguard.
Skipping closure if wsClient == nil prevents panics. Looks good.


124-130: GetClient ensures safe retrieval.
Returning a bool for initialization state is simpler than throwing an error; it clarifies the usage.

fetcher/chainlink/types.go (8)

4-4: Additional errors import recognized.
No conflicts or overshadowing of built-in error handling.


19-24: New source struct is cohesive.
Combining logger, base Source, and chainlinkProxy references in one struct clarifies which components belong together.


33-42: Updated proxy struct with RWMutex and client map.
Using a sync.RWMutex for reading aggregator references is efficient, particularly if reads far outnumber writes.


46-56: Synchronized addition of tokens is correct.
Locking with locker.Lock() ensures aggregator creation is thread-safe. Potentially confirm if multiple calls adding the same token is safe or gracefully handled.


79-81: Read lock usage.
RLock()/RUnlock() around aggregator retrieval is a standard pattern that nicely limits contention.


85-89: Initialization pattern is consistent.
Nothing stands out here; these constants and variables are well-named.


93-129: initChainlink effectively ties config parsing with source initialization.

  1. Logging fallback is handled carefully.
  2. Confirm that all networks are added before tokens, preventing missing references.

Line range hint 132-144: parseConfig uses a dedicated YAML decoder.
Returning early on error is neat. Double-check file handle usage if the file is large or environment constraints are strict, but likely fine for typical use.

cmd/feeder_tool.go (5)

4-5: Imported packages reflect improved error handling.
No obvious naming collisions or overshadowing.


12-25: New constants for retries look good.
defaultMaxRetry = 43200 attempts with retryInterval = 2s results in a 24-hour span. That’s likely intended.


31-125: RunPriceFeeder logic is significantly improved.

  1. Separating retry logic for GetParams() is more robust, preventing infinite blocks.
  2. The usage of fsMap and feeders to manage token-based feeders is logical.

127-141: getOracleParamsWithMaxRetry approach is valid but watch for extended downtime.
After 43200 attempts, function gives up. This equates to a 24-hour cycle. Confirm that this is acceptable from a user perspective.


144-156: ResetAllStakerValidators helps maintain data integrity.
Ensuring all stakerInfos are re-initialized if partial updates fail is a defensive approach for on-chain states.

fetcher/fetcher.go (8)

4-8: Imports look valid and necessary.
No immediate concerns with adding these packages.


16-18: Constants for logger tags
Using separate constants for loggerTag and loggerTagPrefix is clear, improves readability.


27-36: Fetcher struct concurrency
The use of channels and a mutex suggests concurrency. Ensure fields like running, priceReadList are always accessed under the locker to avoid data races.


54-56: Helper function newGetLatestPriceReq
The function is simple, returning both request and response channel. Fine as is.


72-81: AddTokenForSource blocking
This approach is generally fine. Ensure that the channel-based request does not block indefinitely if the fetcher is never started.


108-114: GetLatestPrice
The function is straightforward. The blocking receive from f.getLatestPrice can stall if Start() hasn't been called. Consider returning an error if f.running is false.


117-135: Start method concurrency
You initialize and store priceReadList after calling source.Start(). That’s good. But be sure each source can handle multiple starts or repeated calls. Consider clarifying that source.Start() can be invoked only once.


190-200: Stop method
The double-check pattern in the select block ensures we only close the channel once. Looks good.

fetcher/beaconchain/types.go (11)

5-7: Imports
Adding packages for big integers, hex, and JSON is consistent with the new functionality.


15-20: Renamed or revised imports
Ensuring consistent naming helps readability. The approach here looks fine, but double-check for import collisions.


39-42: stakerVList definition
Storing a map of staker -> validatorList behind an RWMutex is good for concurrency.


44-49: newStakerVList
Straightforward constructor.


61-74: getStakerValidators
Copies the validator slice to avoid direct reference exposure. Good practice.


76-95: addVIdx logic
Ensures index continuity. If index doesn't match the expected increment, returns false. This is a simple concurrency approach, but ensure the calling side properly handles false responses.


96-116: removeVIdx logic
Similar concurrency concerns as addVIdx. The approach to remove a staker if no validators remain is consistent.


118-147: reset method
Re-initializes the entire staker map if all is true. This is a big re-population step. Make sure calling code is aware of possibly discarding partial changes.


152-152: hexPrefix
Constant usage is straightforward.


161-161: SourceInitializers
This approach is flexible but ensure you handle collisions or re-entries if initBeaconchain is called multiple times.


241-247: convertHexToIntStr
Converts hex to big.Int and returns its string. Straightforward.

fetcher/beaconchain/beaconchain.go (2)

17-17: feedertypes import
No issues with referencing the shared feedertypes.


Line range hint 212-219: getFinalizedEpoch
Exits early on error, ensures stateRoot is updated. If the finalization is partial, consider clarifying logs.

exoclient/subscribe.go (9)

13-14: New type definitions
subEvent and eventQuery are good for type safety.


17-26: Constants
Defining queries (eNewBlock, eTxUpdatePrice, etc.) is more explicit and less error-prone than scattered strings.


30-43: events map
Resets to true before each subscription attempt. This design is flexible but ensure you handle concurrency if multiple routines manipulate events.


110-131: connectWs
Retries up to maxRetry times. The time.Sleep(3s) approach is simple. The function sets up a fresh wsStop channel each time. Make sure to handle partial state if the connection fails repeatedly.


133-144: StopWsRoutines
Closing ec.wsStop ensures the read/ping goroutines stop. This approach is consistent.


157-167: decreaseWsRountines
Similarly checks ec.wsActive. The code ensures the routine count does not go below zero. Looks correct.


169-173: isZeroWsRoutines
Simply checks if the count is 0. Straightforward.


176-188: markWsActive / markWsInactive
Toggling a boolean under lock is standard. Code looks fine.


320-322: resetEvents
Sets all events to true again. Straightforward but watch concurrency.

exoclient/types.go (10)

7-23: Imports and references
The imports align with new event-handling logic (errors, fmt, strings). The usage is consistent.


28-40: ExoClientInf & EventInf
Clearly defining interfaces is a good design. The ExoClientInf covers queries, transactions, and subscription.


42-85: EventNewBlock
Holds new block data (height, feederIDs, etc.). The helper methods and GetEventNewBlock parse the events thoroughly.


90-109: FinalPrice
Simple accessor methods. Overall correct usage for storing token price info.


110-137: EventUpdatePrice
Captures final prices. The GetEventUpdatePrice method returns an error if finalPrice is unavailable. This is good error handling.


139-191: EventUpdateNST
Represents deposit or removal of a beaconchain validator. nstChange is split and validated. All good, though watch out for partial parse scenarios if the string format changes.


193-205: EventType, EventRes
Enumerating event types here ensures clarity across the codebase.


207-219: SubscribeResult
Represents the structured response from websocket events. Maintaining event references in a single place is helpful.


235-345: SubscribeResult methods
Parsing logic for heights, finalPrice, feederIDs, NSTChange is robust. Logs errors on parse failure. This is good.


Line range hint 347-423: Constants & Init
ChainID checks, private key loading from either the mnemonic or file. The fallback approach is flexible. Successfully returns an error if anything fails.

fetcher/types/types.go (1)

63-65: Validate concurrency with the PriceSync struct.

Lock-based concurrency around a pointer to PriceInfo is correct, but please verify that all read/write paths lock appropriately. If any code updates PriceInfo outside of these methods, you risk concurrent access. Consider making info private to enforce usage through the accessor methods only.

cmd/types.go (1)

14-17: Validate usage of priceFetcher interface.

The AddTokenForSource method might block or fail if the source is not started. Ensure callers are prepared for error conditions or a delayed token addition. You might want to clarify expected usage in the docstring or handle potential concurrency pitfalls in the implementations.

exoclient/query.go (2)

5-5: Explicit import usage check
It looks like "fmt" is introduced for formatted error messages and printing. It's being used effectively in the new error returns, so no issues here.


21-24: Return typed zero-value object upon failures
The approach of returning an empty PriceTimeRound object plus a wrapped error is consistent. However, confirm with upstream code that an empty PriceTimeRound is handled correctly, since partial or stale data might cause logical errors if not properly validated.

types/types.go (2)

83-83: Disable stack traces with caution
config.EncoderConfig.StacktraceKey = "" hides stack traces. This can complicate debugging. Confirm you truly need to suppress them in production logs, or consider a debug-level toggle.


154-157: Improved error definitions
Adding distinct error variables for initialization and token configuration is a helpful improvement. When creating new top-level errors, ensure they're consistently wrapped and handled appropriately in the code to avoid losing context.

fetcher/chainlink/chainlink.go Outdated Show resolved Hide resolved
fetcher/chainlink/chainlink.go Outdated Show resolved Hide resolved
fetcher/fetcher.go Show resolved Hide resolved
fetcher/beaconchain/beaconchain.go Show resolved Hide resolved
exoclient/subscribe.go Show resolved Hide resolved
fetcher/types/types.go Show resolved Hide resolved
cmd/types.go Show resolved Hide resolved
cmd/types.go Show resolved Hide resolved
types/config_test.go Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (1)
fetcher/beaconchain/beaconchain.go (1)

Line range hint 293-316: Improve error handling and logging consistency in getValidators.

The function uses the global logger instead of the source logger and could benefit from better error handling.

Consider these improvements:

-func getValidators(validators []string, stateRoot string) ([][]uint64, error) {
+func (s *source) getValidators(validators []string, stateRoot string) ([][]uint64, error) {
     reqBody := ValidatorPostRequest{
         IDs: validators,
     }
     body, _ := json.Marshal(reqBody)
     u := urlEndpoint.JoinPath(fmt.Sprintf(getValidatorsPath, stateRoot))
     res, err := http.Post(u.String(), "application/json", bytes.NewBuffer(body))
     if err != nil {
-        logger.Error("failed to get validators from beaconchain", "error", err)
+        s.logger.Error("failed to get validators from beaconchain", "error", err)
         return nil, err
     }
     defer res.Body.Close()
-    result, _ := io.ReadAll(res.Body)
+    result, err := io.ReadAll(res.Body)
+    if err != nil {
+        s.logger.Error("failed to read response body", "error", err)
+        return nil, fmt.Errorf("failed to read response body: %w", err)
+    }
♻️ Duplicate comments (1)
fetcher/beaconchain/beaconchain.go (1)

85-92: 🛠️ Refactor suggestion

Consider enhancing error reporting in validator management functions.

The current implementation returns boolean values which might not provide enough context about failures.

Consider returning error values instead of booleans to provide more context about failures. For example:

-func UpdateStakerValidators(stakerIdx int, validatorIndexHex string, index uint64, deposit bool) bool {
+func UpdateStakerValidators(stakerIdx int, validatorIndexHex string, index uint64, deposit bool) error {
     if deposit {
-        return defaultStakerValidators.addVIdx(stakerIdx, validatorIndexHex, index)
+        if !defaultStakerValidators.addVIdx(stakerIdx, validatorIndexHex, index) {
+            return fmt.Errorf("failed to add validator: invalid index %d for staker %d", index, stakerIdx)
+        }
+        return nil
     }
-    return defaultStakerValidators.removeVIdx(stakerIdx, validatorIndexHex, index)
+    if !defaultStakerValidators.removeVIdx(stakerIdx, validatorIndexHex, index) {
+        return fmt.Errorf("failed to remove validator: invalid index %d for staker %d", index, stakerIdx)
+    }
+    return nil
 }
🧹 Nitpick comments (11)
fetcher/chainlink/types.go (3)

19-24: Add documentation for the source struct.

Consider adding documentation to explain the purpose of the struct and its fields, which will help other developers understand its role in the chainlink integration.

+// source implements the types.SourceInf interface for Chainlink price feeds.
+// It manages the connection to Ethereum clients and aggregates price data from Chainlink oracles.
 type source struct {
+    // logger handles structured logging for the source
     logger feedertypes.LoggerInf
+    // Source contains the base implementation of the price source
     *types.Source
+    // chainlinkProxy manages connections to Chainlink price feed contracts
     chainlinkProxy *proxy
+    // clients maps network names to their respective Ethereum clients
     clients map[string]*ethclient.Client
 }

63-76: Improve documentation and error handling in addClient.

The method contains commented-out lock operations without explanation. Additionally, error handling could be more descriptive.

Apply these improvements:

+// addClient adds an Ethereum client for the specified network if it doesn't exist.
+// This method is designed to be called during initialization before the proxy becomes
+// concurrent, thus not requiring locks.
 func (p *proxy) addClient(network, url string) error {
-    //  p.locker.Lock()
-    //  defer p.locker.Unlock()
     var err error
     if _, ok := p.clients[network]; !ok {
         if len(url) == 0 {
-            return errors.New("url is empty")
+            return fmt.Errorf("invalid client configuration: network=%s url=empty", network)
         }
         p.clients[network], err = ethclient.Dial(url)
     }
     return err
 }

96-130: Improve error handling and resource cleanup in initChainlink.

Consider adding cleanup of created clients when an error occurs during initialization to prevent resource leaks.

 func initChainlink(cfgPath string, l feedertypes.LoggerInf) (types.SourceInf, error) {
+    var clients []*ethclient.Client
+    cleanup := func() {
+        for _, client := range clients {
+            client.Close()
+        }
+    }
+
     if logger = l; logger == nil {
         if logger = feedertypes.GetLogger("fetcher_chainlink"); logger == nil {
             return nil, feedertypes.ErrInitFail.Wrap("logger is not initialized")
         }
     }
     cfg, err := parseConfig(cfgPath)
     if err != nil {
         return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse config file, path;%s, error:%s", cfgPath, err))
     }
     
     defaultSource = &source{
         logger:         logger,
         clients:        make(map[string]*ethclient.Client),
         chainlinkProxy: newProxy(),
     }
     
     for network, url := range cfg.URLs {
         if len(url) == 0 {
             return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("rpcURL from config is empty, config_file:%s", envConf))
         }
         network = strings.ToLower(network)
         err = defaultSource.chainlinkProxy.addClient(network, url)
         if err != nil {
+            cleanup()
             return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("fail to initialize ethClient, url:%s, error:%s", url, err))
         }
+        if client := defaultSource.chainlinkProxy.clients[network]; client != nil {
+            clients = append(clients, client)
+        }
     }
     
     if err = defaultSource.chainlinkProxy.addToken(cfg.Tokens); err != nil {
+        cleanup()
         return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to add chainlinkPriceFeedProxy for token:%s, error:%v", cfg.Tokens, err))
     }
     
     defaultSource.Source = types.NewSource(logger, types.Chainlink, defaultSource.fetch, cfgPath, defaultSource.reload)
     return defaultSource, nil
 }
fetcher/fetcher.go (1)

25-34: Add documentation and organize fields in the Fetcher struct.

The struct would benefit from documentation and logical grouping of fields.

+// Fetcher manages price data collection from multiple sources.
+// It handles concurrent access to price data and provides methods
+// to add new tokens and retrieve latest prices.
 type Fetcher struct {
+    // Core components
     logger  feedertypes.LoggerInf
     locker  *sync.Mutex
     running bool
+
+    // Data storage
     sources map[string]types.SourceInf
-    // source->map{token->price}
     priceReadList  map[string]map[string]*types.PriceSync
+
+    // Communication channels
     addSourceToken chan *addTokenForSourceReq
     getLatestPrice chan *getLatestPriceReq
     stop           chan struct{}
 }
fetcher/beaconchain/types.go (2)

39-42: Consider improving error handling in reset method.

The stakerVList implementation is thread-safe, but there are a few improvements to consider:

  1. The error message in line 129 uses both fmt.Errorf and fmt.Sprintf unnecessarily.
  2. The TODO comment suggests potential validation gaps.

Apply this diff to improve error handling:

-      return fmt.Errorf(fmt.Sprintf("failed to convert validatorIndex from hex string to int, validator-index-hex:%s", validatorIndexHex))
+      return fmt.Errorf("failed to convert validatorIndex from hex string to int, validator-index-hex:%s", validatorIndexHex)

Also applies to: 118-147


177-180: Consider returning a specific error for unsupported operations.

Instead of returning nil, consider returning an "operation not supported" error to be more explicit.

Apply this diff:

 func (s *source) reload(token, cfgPath string) error {
-    return nil
+    return fmt.Errorf("reload operation not supported for beaconchain source")
 }
fetcher/beaconchain/beaconchain.go (1)

Line range hint 95-176: Consider breaking down the fetch method for better maintainability.

The fetch method handles multiple responsibilities including:

  • Fetching finalized epoch
  • Processing validator balances
  • Calculating changes

Consider extracting these responsibilities into separate methods:

+func (s *source) processValidatorBalances(validators []string, stateRoot string) (int, error) {
+    stakerBalance := 0
+    // Process validators in batches of 100
+    for i := 0; i < len(validators); i += 100 {
+        end := i + 100
+        if end > len(validators) {
+            end = len(validators)
+        }
+        validatorBalances, err := getValidators(validators[i:end], stateRoot)
+        if err != nil {
+            return 0, fmt.Errorf("failed to get validators from beaconchain, error:%w", err)
+        }
+        for _, validatorBalance := range validatorBalances {
+            stakerBalance += int(validatorBalance[1])
+        }
+    }
+    return stakerBalance, nil
+}
exoclient/types.go (1)

Line range hint 364-431: Consider breaking down the Init function for better maintainability.

The Init function handles multiple responsibilities including:

  • Logger initialization
  • Configuration validation
  • Private key loading
  • Client initialization

Consider extracting the private key loading logic into a separate function:

+func loadPrivateKey(mnemonic, privFile, confPath string) (cryptotypes.PrivKey, error) {
+    if len(mnemonic) > 0 {
+        if !bip39.IsMnemonicValid(mnemonic) {
+            return nil, fmt.Errorf("invalid mnemonic: %s", mnemonic)
+        }
+        return ed25519.GenPrivKeyFromSecret([]byte(mnemonic)), nil
+    }
+
+    file, err := os.Open(path.Join(confPath, privFile))
+    if err != nil {
+        return nil, fmt.Errorf("failed to open consensuskey file, path: %s, error: %v", privFile, err)
+    }
+    defer file.Close()
+
+    var privKey feedertypes.PrivValidatorKey
+    if err := json.NewDecoder(file).Decode(&privKey); err != nil {
+        return nil, fmt.Errorf("failed to parse consensuskey from json file, file path: %s, error: %v", privFile, err)
+    }
+
+    privBytes, err := base64.StdEncoding.DecodeString(privKey.PrivKey.Value)
+    if err != nil {
+        return nil, fmt.Errorf("failed to parse privatekey from base64_string: %s, error: %v", privKey.PrivKey.Value, err)
+    }
+
+    return &ed25519.PrivKey{
+        Key: cryptoed25519.PrivateKey(privBytes),
+    }, nil
+}
fetcher/types/types.go (2)

365-400: Implement backoff mechanism for failed price fetches.

The startFetchToken method could benefit from:

  1. Implementing an exponential backoff for failed price fetches
  2. Addressing the TODO comment about maximum fails

Consider implementing a backoff mechanism:

 func (s *Source) startFetchToken(token *tokenInfo) {
 	s.activeTokenCount.Add(1)
 	token.SetActive(true)
+	backoff := time.Second
+	maxBackoff := time.Minute
+	consecutiveFails := 0
+	maxConsecutiveFails := 5
 	go func() {
 		defer func() {
 			token.SetActive(false)
 			s.activeTokenCount.Add(-1)
 		}()
 		tic := time.NewTicker(s.interval)
 		for {
 			select {
 			case <-s.stop:
 				s.logger.Info("exist fetching routine", "source", s.name, "token", token)
 				return
 			case <-tic.C:
 				if price, err := s.fetch(token.name); err != nil {
+					consecutiveFails++
+					if consecutiveFails >= maxConsecutiveFails {
+						s.logger.Error("maximum consecutive failures reached", "source", s.name, "token", token.name)
+						return
+					}
+					time.Sleep(backoff)
+					backoff = min(backoff*2, maxBackoff)
 				} else {
+					consecutiveFails = 0
+					backoff = time.Second
 					updated := token.price.Update(*price)

132-140: Consider memory management in Update method.

The Update method creates a new copy of PriceInfo on each call, even when the price hasn't changed. For high-frequency updates, this could lead to unnecessary memory allocations.

Consider checking equality before creating a copy:

 func (p *PriceSync) Update(price PriceInfo) (updated bool) {
 	p.lock.Lock()
-	if !price.EqualPrice(*p.info) {
+	if p.info != nil && !price.EqualPrice(*p.info) {
 		*p.info = price
 		updated = true
 	}
 	p.lock.Unlock()
 	return
 }
cmd/types.go (1)

396-403: Add context for cancellation in Trigger method.

The Trigger method lacks context for cancellation and timeout management.

Consider adding context:

-func (fs *Feeders) Trigger(height int64, feederIDs map[int64]struct{}) {
+func (fs *Feeders) Trigger(ctx context.Context, height int64, feederIDs map[int64]struct{}) {
 	select {
 	case fs.trigger <- &triggerReq{height: height, feederIDs: feederIDs}:
+	case <-ctx.Done():
+		fs.logger.Error("trigger cancelled", "error", ctx.Err())
 	default:
 	}
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b11b1dc and 66bb064.

📒 Files selected for processing (8)
  • cmd/feeder_tool.go (1 hunks)
  • cmd/types.go (1 hunks)
  • exoclient/types.go (2 hunks)
  • fetcher/beaconchain/beaconchain.go (7 hunks)
  • fetcher/beaconchain/types.go (4 hunks)
  • fetcher/chainlink/types.go (2 hunks)
  • fetcher/fetcher.go (1 hunks)
  • fetcher/types/types.go (1 hunks)
🔇 Additional comments (5)
fetcher/beaconchain/types.go (2)

24-27: LGTM! Improved initialization and error handling.

The source struct and initialization code have been enhanced with:

  • Proper encapsulation of logger and Source fields
  • Better error messages using fmt.Sprintf
  • Consistent error handling patterns

Also applies to: 164-228


243-250: LGTM! Clean utility function implementation.

The convertHexToIntStr function is well-implemented with proper error handling.

exoclient/types.go (1)

28-44: LGTM! Well-designed interfaces.

The interfaces are well-structured with clear method signatures and good separation of concerns.

fetcher/types/types.go (2)

22-46: ⚠️ Potential issue

Add validation and thread safety to signInfo.

The signInfo struct has potential issues:

  1. No validation for maxNonce in initialization
  2. Potential race conditions in nonce management

Consider adding validation and mutex:

 type signInfo struct {
+	lock     sync.Mutex
 	maxNonce int32
 	roundID  int64
 	nonce    int32
 }

 func (s *signInfo) getNextNonceAndUpdate(roundID int64) int32 {
+	s.lock.Lock()
+	defer s.lock.Unlock()
 	if roundID < s.roundID {
 		return -1
 	} else if roundID > s.roundID {

Likely invalid or redundant comment.


257-322: 🛠️ Refactor suggestion

Enhance error handling and retry mechanisms in Start().

The method has several areas that could be improved:

  1. When source is already running, it silently returns nil instead of a descriptive error.
  2. No retry mechanism for failed token reloads.

Consider implementing:

 func (s *Source) Start() map[string]*PriceSync {
 	s.locker.Lock()
 	if s.running {
-		s.logger.Error("failed to start the source which is already running", "source", s.name)
-		s.locker.Unlock()
-		return nil
+		s.locker.Unlock()
+		return fmt.Errorf("source %s is already running", s.name)
 	}

Likely invalid or redundant comment.

fetcher/chainlink/types.go Outdated Show resolved Hide resolved
fetcher/fetcher.go Outdated Show resolved Hide resolved
cmd/feeder_tool.go Outdated Show resolved Hide resolved
cmd/feeder_tool.go Outdated Show resolved Hide resolved
exoclient/types.go Outdated Show resolved Hide resolved
cmd/types.go Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

♻️ Duplicate comments (1)
cmd/feeder_tool.go (1)

43-49: ⚠️ Potential issue

Replace panic with proper error handling.

The function should return errors instead of using panic. This is a duplicate of a previous review comment.

-func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) {
+func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) error {
     if logger = feedertypes.SetLogger(logger); logger == nil {
-        panic("logger is not initialized")
+        return fmt.Errorf("logger initialization failed")
     }
     if err := initComponents(logger, conf, standalone); err != nil {
         logger.Error("failed to initialize components")
-        panic(err)
+        return fmt.Errorf("component initialization failed: %w", err)
     }
🧹 Nitpick comments (15)
fetcher/chainlink/chainlink.go (1)

66-89: Add success logging for better observability.

Consider adding debug/info level logging when a token is successfully added during reload.

 			if err := s.chainlinkProxy.addToken(map[string]string{tName: tContract}); err != nil {
 				s.logger.Error("failed to add proxy when do reload", "source", s.GetName(), "token", tName, "error", err)
+			} else {
+				s.logger.Debug("successfully added token during reload", "source", s.GetName(), "token", tName)
 			}
types/types.go (1)

40-52: Consider security implications of mnemonic storage.

While the Config struct is well-organized, storing the mnemonic in configuration poses security risks. Consider:

  1. Using environment variables or secure secret management
  2. Adding encryption for the mnemonic when stored in config
fetcher/fetcher.go (5)

26-50: Consider making channel buffer sizes configurable.

While the channel-based design is good, the hardcoded buffer sizes (5) might need adjustment based on load. Consider making these configurable.


123-123: Consider making timeout duration configurable.

The hardcoded timeout of 5 seconds might need adjustment based on network conditions and load.


136-137: Address the TODO comment about cleaning logs.

The TODO indicates a need to clean up logging. Consider implementing structured logging with appropriate log levels.

Would you like me to help implement a more structured logging approach?


211-216: Consider using a more robust string splitting approach.

The current string manipulation for source names might be fragile. Consider using a regular expression or dedicated parsing function for better reliability.

-		sNames := strings.Split(strings.Map(func(r rune) rune {
-			if unicode.IsSpace(r) {
-				return -1
-			}
-			return r
-		}, ts.Sources), ",")
+		sNames := strings.FieldsFunc(ts.Sources, func(r rune) bool {
+			return unicode.IsSpace(r) || r == ','
+		})

239-240: Consider dependency injection instead of global state.

Using a global defaultFetcher might make testing and maintaining multiple fetcher instances difficult. Consider passing the fetcher instance where needed instead.

cmd/feeder_tool.go (3)

19-28: Consider extracting magic numbers into named constants.

The retry configuration could be improved by:

  1. Extracting the magic number 43200 into a named constant
  2. Making the retry configuration more flexible
+const (
+    // defaultMaxRetryAttempts represents 12 hours worth of retries at 1-second intervals
+    defaultMaxRetryAttempts = 43200
+    defaultRetryInterval    = 2 * time.Second
+)

 type RetryConfig struct {
     MaxAttempts int
     Interval    time.Duration
 }

 // DefaultRetryConfig provides default retry settings
 var DefaultRetryConfig = RetryConfig{
-    MaxAttempts: 43200, // defaultMaxRetry
-    Interval:    2 * time.Second,
+    MaxAttempts: defaultMaxRetryAttempts,
+    Interval:    defaultRetryInterval,
 }

91-132: Extract event handling logic into separate functions.

The event handling logic should be extracted into separate functions for better maintainability and testability.

+func handleNewBlockEvent(e *exoclient.EventNewBlock, feeders *Feeders, ecClient exoclient.ExoClientInf, logger feedertypes.LoggerInf) error {
+    if e.ParamsUpdate() {
+        oracleP, err := getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger)
+        if err != nil {
+            return fmt.Errorf("failed to get oracle params: %w", err)
+        }
+        feeders.UpdateOracleParams(oracleP)
+    }
+    feeders.Trigger(e.Height(), e.FeederIDs())
+    return nil
+}

+func handleUpdatePriceEvent(e *exoclient.EventUpdatePrice, feeders *Feeders, oracleP *oracletypes.Params, logger feedertypes.LoggerInf) error {
+    finalPrices := make([]*finalPrice, 0, len(e.Prices()))
+    for _, price := range e.Prices() {
+        feederIDList := oracleP.GetFeederIDsByTokenID(uint64(price.TokenID()))
+        if len(feederIDList) == 0 {
+            logger.Error("Failed to get feederIDs by tokenID", "tokenID", price.TokenID())
+            continue
+        }
+        feederID := feederIDList[len(feederIDList)-1]
+        finalPrices = append(finalPrices, &finalPrice{
+            feederID: int64(feederID),
+            price:    price.Price(),
+            decimal:  price.Decimal(),
+            roundID:  price.RoundID(),
+        })
+    }
+    feeders.UpdatePrice(e.TxHeight(), finalPrices)
+    return nil
+}

 for event := range ecClient.EventsCh() {
     switch e := event.(type) {
     case *exoclient.EventNewBlock:
-        if paramsUpdate := e.ParamsUpdate(); paramsUpdate {
-            oracleP, err = getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger)
-            if err != nil {
-                fmt.Printf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err)
-                return
-            }
-            feeders.UpdateOracleParams(oracleP)
+        if err := handleNewBlockEvent(e, feeders, ecClient, logger); err != nil {
+            logger.Error("Failed to handle new block event", "error", err)
+            return fmt.Errorf("new block event handling failed: %w", err)
         }
-        feeders.Trigger(e.Height(), e.FeederIDs())
     case *exoclient.EventUpdatePrice:
-        finalPrices := make([]*finalPrice, 0, len(e.Prices()))
-        // ... rest of the price update logic
+        if err := handleUpdatePriceEvent(e, feeders, oracleP, logger); err != nil {
+            logger.Error("Failed to handle price update event", "error", err)
+            return fmt.Errorf("price update event handling failed: %w", err)
         }

136-150: Enhance retry mechanism with exponential backoff.

The current retry mechanism uses a fixed interval. Consider implementing exponential backoff for better resilience.

+func getBackoffDuration(attempt int, baseInterval time.Duration) time.Duration {
+    if attempt <= 0 {
+        return baseInterval
+    }
+    // Calculate exponential backoff with jitter
+    backoff := baseInterval * time.Duration(1<<uint(attempt-1))
+    if backoff > 30*time.Second {
+        backoff = 30 * time.Second // Cap at 30 seconds
+    }
+    return backoff
+}

 func getOracleParamsWithMaxRetry(maxRetry int, ecClient exoclient.ExoClientInf, logger feedertypes.LoggerInf) (oracleP *oracletypes.Params, err error) {
     if maxRetry <= 0 {
         maxRetry = DefaultRetryConfig.MaxAttempts
     }
     for i := 0; i < maxRetry; i++ {
         oracleP, err = ecClient.GetParams()
         if err == nil {
             return
         }
-        logger.Error("Failed to get oracle params, retrying...", "count", i, "max", maxRetry, "error", err)
-        time.Sleep(DefaultRetryConfig.Interval)
+        backoff := getBackoffDuration(i, DefaultRetryConfig.Interval)
+        logger.Error("Failed to get oracle params, retrying...",
+            "attempt", i+1,
+            "maxAttempts", maxRetry,
+            "backoff", backoff.String(),
+            "error", err)
+        time.Sleep(backoff)
     }
     return
 }
exoclient/types.go (1)

279-320: Improve base64 validation in FinalPrice method.

The base64 validation is only performed for strings of length 32, which could miss invalid base64 strings of different lengths.

+func isValidBase64(s string) bool {
+    _, err := base64.StdEncoding.DecodeString(s)
+    return err == nil
+}

 func (s *SubscribeResult) FinalPrice() (prices []*FinalPrice, valid bool) {
     if fps := s.Result.Events.FinalPrice; len(fps) > 0 {
         prices = make([]*FinalPrice, 0, len(fps))
         for _, price := range fps {
             parsed := strings.Split(price, "_")
             if l := len(parsed); l > 4 {
                 // nsteth
                 parsed[2] = strings.Join(parsed[2:l-1], "_")
                 parsed[3] = parsed[l-1]
                 parsed = parsed[:4]
             }
-            if len(parsed[2]) == 32 {
-                // make sure this base64 string is valid
-                if _, err := base64.StdEncoding.DecodeString(parsed[2]); err != nil {
-                    logger.Error("failed to parse base64 encoded string when parse finalprice.price from SbuscribeResult", "parsed.price", parsed[2])
-                    return
-                }
+            // Validate base64 string regardless of length
+            if !isValidBase64(parsed[2]) {
+                logger.Error("failed to parse base64 encoded string when parse finalprice.price from SubscribeResult",
+                    "parsed.price", parsed[2],
+                    "length", len(parsed[2]))
+                return
             }
fetcher/types/types.go (2)

195-214: Clean up commented code and add documentation.

The Source struct contains commented-out fields and TODOs that should be addressed. Additionally, the struct could benefit from better documentation.

+// Source implements the SourceInf interface and provides a common implementation
+// for price fetching sources. It manages token initialization, price updates,
+// and concurrent access to price data.
 type Source struct {
     logger    feedertypes.LoggerInf
     cfgPath   string
     running   bool
     priceList map[string]*PriceSync
     name      string
     locker    *sync.Mutex
     stop      chan struct{}
-    // 'fetch' interacts directly with data source
+    // fetch interacts directly with data source to retrieve price information
     fetch            SourceFetchFunc
     reload           SourceReloadConfigFunc
     tokens           map[string]*tokenInfo
     activeTokenCount *atomic.Int32
     interval         time.Duration
     addToken         chan *addTokenReq
-    // pendingTokensCount *atomic.Int32
-    //	pendingTokensLimit int32
-    // used to trigger reloading source config
+    // tokenNotConfigured is used to trigger reloading source config
     tokenNotConfigured chan string
 }

327-349: Improve error handling in AddTokenAndStart method.

The method should use predefined error variables and improve its documentation.

+// ErrSourceNotRunning indicates that the source is not in a running state
+var ErrSourceNotRunning = errors.New("source not running")
+
+// ErrTooManyPendingTokens indicates that the token addition queue is full
+var ErrTooManyPendingTokens = errors.New("too many pending tokens")

+// AddTokenAndStart adds a token to a running source and starts fetching its price.
+// It returns a result object containing either a price synchronization object or
+// an error if the operation fails. The method is non-blocking and will return
+// ErrTooManyPendingTokens if the token addition queue is full.
 func (s *Source) AddTokenAndStart(token string) *addTokenRes {
     s.locker.Lock()
     defer s.locker.Unlock()
     if !s.running {
         return &addTokenRes{
             price: nil,
-            err:   fmt.Errorf("didn't add token due to source:%s not running", s.name),
+            err:   fmt.Errorf("%w: %s", ErrSourceNotRunning, s.name),
         }
     }
     
     addReq, addResCh := newAddTokenReq(token)
     select {
     case s.addToken <- addReq:
         return <-addResCh
     default:
     }
-    // TODO(leon): define an res-skipErr variable
     return &addTokenRes{
         price: nil,
-        err:   fmt.Errorf("didn't add token, too many pendings, limit:%d", defaultPendingTokensLimit),
+        err:   fmt.Errorf("%w: limit %d", ErrTooManyPendingTokens, defaultPendingTokensLimit),
     }
 }
cmd/types.go (2)

183-191: Simplify redundant price comparison logic.

The current implementation has redundant conditions for price comparison.

-        if len(price.Price) >= 32 && price.EqualToBase64Price(f.lastPrice.price) {
-            f.logger.Info("didn't submit price due to price not changed", "roundID", roundID, "delta", delta, "price", price)
-            f.logger.Debug("got latsetprice equal to local cache", "feeder", f.Info())
-            continue
-        } else if price.EqualPrice(f.lastPrice.price) {
+        if price.EqualToBase64Price(f.lastPrice.price) || price.EqualPrice(f.lastPrice.price) {
             f.logger.Info("didn't submit price due to price not changed", "roundID", roundID, "delta", delta, "price", price)
             f.logger.Debug("got latsetprice equal to local cache", "feeder", f.Info())
             continue
-        }

68-68: Address TODO comments for missing functionality.

Several TODO comments indicate incomplete implementations:

  1. Graceful shutdown mechanism
  2. Multiple source support
  3. Rule ID validation
  4. Oracle parameters update

Would you like me to:

  1. Generate implementations for these missing features?
  2. Create GitHub issues to track these tasks?

Please let me know which approach you prefer, and I'll help implement the missing functionality.

Also applies to: 71-71, 76-76, 227-227

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 66bb064 and 7ef38b3.

📒 Files selected for processing (8)
  • cmd/feeder_tool.go (1 hunks)
  • cmd/types.go (1 hunks)
  • exoclient/types.go (2 hunks)
  • fetcher/chainlink/chainlink.go (2 hunks)
  • fetcher/chainlink/types.go (2 hunks)
  • fetcher/fetcher.go (1 hunks)
  • fetcher/types/types.go (1 hunks)
  • types/types.go (3 hunks)
🔇 Additional comments (15)
fetcher/chainlink/chainlink.go (3)

11-12: LGTM! Import changes are well-structured.

The new imports are properly aliased and organized according to Go conventions.


19-40: LGTM! Well-structured error handling and type improvements.

The refactored method shows several improvements:

  1. Better error handling with specific error messages
  2. Appropriate use of int32 for cross-platform compatibility
  3. Proper token configuration validation

82-85: Enhance error handling for token addition.

The current implementation logs the error but continues execution. Consider whether this is the intended behavior or if we should return the error.

Please confirm if suppressing the error and continuing execution is the intended behavior.

types/types.go (4)

13-26: Complete the interface definition for PriceInfoInf.

The interface is currently commented out and methods lack parameter and return types. This needs to be completed to ensure proper type safety and documentation.

Would you like me to help define the complete interface with proper method signatures?


35-38: LGTM! Well-structured configuration mapping.

The TokenSources struct provides a clean way to map tokens to their sources with proper configuration tags.


83-83: Verify impact of disabled stack traces.

Setting StacktraceKey to empty string disables stack traces in logs. Ensure this won't impact debugging capabilities in production environments.


154-157: LGTM! Well-defined error types.

The error variables are well-defined with clear, specific messages that will help with debugging and error handling.

fetcher/chainlink/types.go (3)

19-24: LGTM! Well-structured source management.

The source struct provides a clean organization of chainlink-specific functionality while properly embedding the base Source type.


46-61: LGTM! Improved error handling and synchronization.

The method properly implements mutex locking and includes the improved error messages as suggested in previous reviews.


96-130: LGTM! Robust initialization process.

The initialization process is well-structured with proper error handling, logging, and configuration management.

fetcher/types/types.go (1)

260-322: 🛠️ Refactor suggestion

Improve error handling in Start method.

The method should return an error as suggested in the TODO comment. Additionally, the method could benefit from better documentation.

+// Start begins background routines to fetch prices for all registered tokens.
+// It returns a map of token names to their corresponding price synchronization
+// objects and an error if the source is already running or has no tokens.
-func (s *Source) Start() map[string]*PriceSync {
+func (s *Source) Start() (map[string]*PriceSync, error) {
     s.locker.Lock()
     if s.running {
         s.logger.Error("failed to start the source which is already running", "source", s.name)
         s.locker.Unlock()
-        return nil
+        return nil, fmt.Errorf("source %s is already running", s.name)
     }
     if len(s.tokens) == 0 {
         s.logger.Error("failed to start the source which has no tokens set", "source", s.name)
         s.locker.Unlock()
-        return nil
+        return nil, fmt.Errorf("source %s has no tokens configured", s.name)
     }
     s.running = true
     s.locker.Unlock()
     
     ret := make(map[string]*PriceSync)
     // ... rest of the implementation ...
-    return ret
+    return ret, nil
 }

Likely invalid or redundant comment.

cmd/types.go (4)

14-20: LGTM! Well-designed interfaces.

The interfaces follow the Interface Segregation Principle with clear, focused responsibilities.


346-393: ⚠️ Potential issue

Add error handling and prevent potential deadlock.

The Start method has several concerns:

  1. No error handling for channel operations
  2. Potential deadlock in updateParams handling
  3. No context for cancellation

Consider implementing proper error handling and context support:

 type Feeders struct {
+    ctx     context.Context
+    cancel  context.CancelFunc
     logger  feedertypes.LoggerInf
     // ... other fields
 }

-func (fs *Feeders) Start() {
+func (fs *Feeders) Start(ctx context.Context) error {
+    fs.ctx, fs.cancel = context.WithCancel(ctx)
     for _, f := range fs.feederMap {
-        f.start()
+        if err := f.start(); err != nil {
+            return fmt.Errorf("failed to start feeder: %w", err)
+        }
     }
     go func() {
         for {
             select {
+            case <-fs.ctx.Done():
+                return
             case params := <-fs.updateParams:
                 results := []chan *updateParamsRes{}
                 for _, f := range fs.feederMap {
-                    res := f.updateParams(params)
+                    select {
+                    case <-fs.ctx.Done():
+                        return
+                    case res := <-f.updateParams(params):
+                        results = append(results, res)
+                    }
                 }
                 // ... rest of the implementation

Likely invalid or redundant comment.


154-221: ⚠️ Potential issue

Implement graceful shutdown and improve error handling.

The goroutine management needs improvement:

  1. No way to stop the goroutine (noted in TODO)
  2. Potential goroutine leak
  3. Unbounded operations in select cases

Consider implementing a graceful shutdown mechanism:

 type feeder struct {
+    done     chan struct{}
     logger   feedertypes.LoggerInf
     // ... other fields
 }

 func (f *feeder) start() {
+    f.done = make(chan struct{})
     go func() {
+        defer func() {
+            close(f.priceCh)
+            close(f.heightsCh)
+            close(f.paramsCh)
+        }()
         for {
             select {
+            case <-f.done:
+                return
             case h := <-f.heightsCh:
                 // ... existing code
             }
         }
     }()
 }

+func (f *feeder) stop() {
+    if f.done != nil {
+        close(f.done)
+    }
+}

Likely invalid or redundant comment.


22-46: ⚠️ Potential issue

Add validation and handle edge cases in SignInfo.

  1. Validate maxNonce to ensure it's positive in the constructor
  2. Add overflow protection for nonce increment

Apply this diff to address the issues:

 type signInfo struct {
     maxNonce int32
     roundID  int64
     nonce    int32
 }
+
+func newSignInfo(maxNonce int32) (*signInfo, error) {
+    if maxNonce <= 0 {
+        return nil, fmt.Errorf("maxNonce must be positive, got %d", maxNonce)
+    }
+    return &signInfo{maxNonce: maxNonce}, nil
+}

 func (s *signInfo) getNextNonceAndUpdate(roundID int64) int32 {
     if roundID < s.roundID {
         return -1
     } else if roundID > s.roundID {
         s.roundID = roundID
         s.nonce = 1
         return 1
     }
-    if s.nonce = s.nonce + 1; s.nonce > s.maxNonce {
+    // Protect against overflow
+    if s.nonce >= s.maxNonce || s.nonce == math.MaxInt32 {
         s.nonce = s.maxNonce
         return -1
     }
+    s.nonce++
     return s.nonce
 }

Likely invalid or redundant comment.

fetcher/chainlink/chainlink.go Outdated Show resolved Hide resolved
fetcher/chainlink/chainlink.go Outdated Show resolved Hide resolved
fetcher/chainlink/chainlink.go Outdated Show resolved Hide resolved
fetcher/chainlink/types.go Show resolved Hide resolved
cmd/feeder_tool.go Outdated Show resolved Hide resolved
exoclient/types.go Show resolved Hide resolved
exoclient/types.go Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
exoclient/types.go (1)

Line range hint 359-426: Replace panic with proper error handling in Init function.

The function still uses panic for logger initialization. Consider returning an error instead:

 func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) error {
     if logger = feedertypes.GetLogger("exoclient"); logger == nil {
-        panic("logger is not initialized")
+        return fmt.Errorf("logger initialization failed")
     }
♻️ Duplicate comments (1)
cmd/feeder_tool.go (1)

42-133: ⚠️ Potential issue

Replace remaining panic statements with proper error handling.

The function still uses panic for error handling in several places. Consider returning errors instead:

-func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) {
+func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) error {
     if logger = feedertypes.SetLogger(logger); logger == nil {
-        panic("logger is not initialized")
+        return fmt.Errorf("logger initialization failed")
     }

     if err := initComponents(logger, conf, standalone); err != nil {
         logger.Error("failed to initialize components")
-        panic(err)
+        return fmt.Errorf("component initialization failed: %w", err)
     }

     f, _ := fetcher.GetFetcher()
     logger.Info("start fetching prices from all sources")
     if err := f.Start(); err != nil {
-        panic(fmt.Sprintf("failed to start Fetcher, error:%v", err))
+        return fmt.Errorf("failed to start Fetcher: %w", err)
     }

     oracleP, err := getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger)
     if err != nil {
-        panic(fmt.Sprintf("failed to get initial oracle params: %v", err))
+        return fmt.Errorf("failed to get initial oracle params: %w", err)
     }
🧹 Nitpick comments (4)
fetcher/chainlink/chainlink.go (2)

26-32: Consider adding timeout context for external calls.

The LatestRoundData and Decimals calls are using default CallOpts without timeout context, which could potentially hang in case of network issues.

+   ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+   defer cancel()
-   roundData, err := chainlinkPriceFeedProxy.LatestRoundData(&bind.CallOpts{})
+   roundData, err := chainlinkPriceFeedProxy.LatestRoundData(&bind.CallOpts{Context: ctx})
    if err != nil {
        return nil, fmt.Errorf("failed to get LatestRoundData of token:%s from chainlink, error:%w", token, err)
    }

-   decimals, err := chainlinkPriceFeedProxy.Decimals(&bind.CallOpts{})
+   decimals, err := chainlinkPriceFeedProxy.Decimals(&bind.CallOpts{Context: ctx})

72-77: Consider adding validation for network URLs.

The URL validation for network clients should be performed before attempting to create the client.

    for network, url := range cfg.URLs {
        network = strings.ToLower(network)
+       if !strings.HasPrefix(url, "http://") && !strings.HasPrefix(url, "https://") {
+           return fmt.Errorf("invalid URL format for network %s: %s", network, url)
+       }
        if err := s.chainlinkProxy.addClient(network, url); err != nil {
cmd/feeder_tool.go (2)

19-28: Document the magic number in DefaultRetryConfig.

The MaxAttempts value of 43200 appears to be a magic number. Consider adding a comment explaining why this specific value was chosen (e.g., if it represents 24 hours of retries with a 2-second interval).


185-185: Fix typo in error message.

There's a typo in the error message: error;%w should be error:%w.

-            return fmt.Errorf("failed to init exoclient, error;%w", err)
+            return fmt.Errorf("failed to init exoclient, error:%w", err)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7ef38b3 and 9052596.

📒 Files selected for processing (3)
  • cmd/feeder_tool.go (1 hunks)
  • exoclient/types.go (2 hunks)
  • fetcher/chainlink/chainlink.go (2 hunks)
🔇 Additional comments (9)
fetcher/chainlink/chainlink.go (4)

11-12: LGTM! Import changes align with the refactoring.

The new imports correctly support the type changes mentioned in the AI summary, where PriceInfo type is now from fetchertypes package.


19-22: LGTM! Improved error handling for token configuration.

The error message is now clearer and more consistent with the codebase standards.


66-70: LGTM! Proper error wrapping in config parsing.

The error handling follows good practices by wrapping the original error for better debugging context.


88-88: Fix typo in error message.

The error message has a typo that was identified in past reviews.

-   return errors.New("token not found in reloaded config file")
+   return fmt.Errorf("token %q not found in reloaded config file", token)
cmd/feeder_tool.go (1)

136-150: LGTM! Well-structured retry mechanism.

The function implements a clean retry mechanism with proper error handling and logging.

exoclient/types.go (4)

28-186: LGTM! Well-structured interfaces and event handling.

The interfaces and event structs are well-defined with clear responsibilities and comprehensive event handling.


276-317: LGTM! Robust price parsing implementation.

The price parsing logic is thorough with proper handling of:

  • Special cases for nsteth
  • Base64 validation
  • Error conditions

385-423: LGTM! Improved error messages with better context.

The error messages have been improved to include more context and better descriptions, making debugging easier.


253-274: Review the logic for setting valid flag in FeederIDs method.

The method unconditionally sets valid = true at the end, which might mask cases where no feeder IDs were found. While this might be intentional (as indicated by the comment), consider if this is the desired behavior.

✅ Verification successful

The unconditional valid = true is intentional and correct

After analyzing the code usage, the current behavior is correct:

  • The method's comment explicitly states: // FeederIDs will return (nil, true) when there's no feederIDs
  • The usage in GetEventNewBlock() shows that the caller checks both the returned map and the valid flag
  • The unconditional valid = true at the end is consistent with the documented behavior, indicating that absence of feeder IDs is a valid state rather than an error condition
  • The method properly handles error cases (like parsing errors) by returning (nil, false) early
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for usages of FeederIDs() to understand the impact
ast-grep --pattern 'FeederIDs()' -A 5

Length of output: 39


Script:

#!/bin/bash
# Search for usages of FeederIDs method with ripgrep for better pattern matching
rg "FeederIDs\(\)" -A 5

# Also search for any references to feederIDs to understand the context
rg "feederIDs" -A 5

Length of output: 4950

fetcher/chainlink/chainlink.go Outdated Show resolved Hide resolved
@leonz789 leonz789 merged commit c4500c6 into develop Dec 27, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant