Skip to content

Commit

Permalink
One EC client instead of two
Browse files Browse the repository at this point in the history
  • Loading branch information
vsuharnikov committed Sep 5, 2024
1 parent c2b2261 commit fc21d01
Show file tree
Hide file tree
Showing 27 changed files with 117 additions and 164 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ buildTarballsForDocker := {
baseDirectory.value / "docker" / "target" / "consensus-client.tgz"
)
}

Test / unmanagedJars += file("src/test/lib/waves-all.jar")
2 changes: 1 addition & 1 deletion src/main/scala/units/Bridge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.wavesplatform.utils.EthEncoding
import org.web3j.abi.datatypes.Event
import org.web3j.abi.datatypes.generated.{Bytes20, Int64}
import org.web3j.abi.{FunctionReturnDecoder, TypeEncoder, TypeReference}
import units.client.http.model.GetLogsResponseEntry
import units.client.engine.model.GetLogsResponseEntry
import units.eth.Gwei

import java.math.BigInteger
Expand Down
7 changes: 1 addition & 6 deletions src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import net.ceedubs.ficus.Ficus.*
import org.slf4j.LoggerFactory
import sttp.client3.HttpClientSyncBackend
import units.client.engine.{EngineApiClient, HttpEngineApiClient}
import units.client.http.{EcApiClient, HttpEcApiClient}
import units.client.{JwtAuthenticationBackend, LoggingBackend}
import units.network.*

Expand All @@ -28,7 +27,6 @@ class ConsensusClient(
config: ClientConfig,
context: ExtensionContext,
engineApiClient: EngineApiClient,
httpApiClient: EcApiClient,
blockObserver: BlocksObserver,
allChannels: DefaultChannelGroup,
globalScheduler: Scheduler,
Expand All @@ -42,7 +40,6 @@ class ConsensusClient(
deps.config,
context,
deps.engineApiClient,
deps.httpApiClient,
deps.blockObserver,
deps.allChannels,
deps.globalScheduler,
Expand All @@ -54,7 +51,6 @@ class ConsensusClient(

private[units] val elu =
new ELUpdater(
httpApiClient,
engineApiClient,
context.blockchain,
context.utx,
Expand Down Expand Up @@ -123,14 +119,13 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab
httpClientBackend
}
)
val httpApiClient = new HttpEcApiClient(config, httpClientBackend)

val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
val peerDatabase = new PeerDatabaseImpl(config.network)
val messageObserver = new MessageObserver()
private val networkServer = NetworkServer(
config,
new HistoryReplier(httpApiClient, engineApiClient)(globalScheduler),
new HistoryReplier(engineApiClient)(globalScheduler),
peerDatabase,
messageObserver,
allChannels,
Expand Down
27 changes: 12 additions & 15 deletions src/main/scala/units/ELUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import units.client.engine.EngineApiClient
import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.model.*
import units.client.engine.model.Withdrawal.WithdrawalIndex
import units.client.http.EcApiClient
import units.client.http.model.EcBlock
import units.eth.{EmptyL2Block, EthAddress, EthereumConstants}
import units.network.BlocksObserverImpl.BlockWithChannel
import units.util.HexBytesConverter
Expand All @@ -45,7 +43,6 @@ import scala.concurrent.duration.*
import scala.util.*

class ELUpdater(
httpApiClient: EcApiClient,
engineApiClient: EngineApiClient,
blockchain: Blockchain,
utx: UtxPool,
Expand Down Expand Up @@ -265,7 +262,7 @@ class ELUpdater(
fixedFinalizedBlock = if (finalizedBlock.height > rollbackBlock.parentBlock.height) rollbackBlock.parentBlock else finalizedBlock
_ <- confirmBlock(rollbackBlock.hash, fixedFinalizedBlock.hash)
_ <- confirmBlock(target, fixedFinalizedBlock)
lastEcBlock <- httpApiClient.getLastExecutionBlock
lastEcBlock <- engineApiClient.getLastExecutionBlock
_ <- Either.cond(
targetHash == lastEcBlock.hash,
(),
Expand Down Expand Up @@ -451,14 +448,14 @@ class ELUpdater(
else {
val finalizedBlock = chainContractClient.getFinalizedBlock
logger.debug(s"Finalized block is ${finalizedBlock.hash}")
httpApiClient.getBlockByHash(finalizedBlock.hash) match {
engineApiClient.getBlockByHash(finalizedBlock.hash) match {
case Left(error) => logger.error(s"Could not load finalized block", error)
case Right(Some(finalizedEcBlock)) =>
logger.trace(s"Finalized block ${finalizedBlock.hash} is at height ${finalizedEcBlock.height}")
(for {
newEpochInfo <- calculateEpochInfo
mainChainInfo <- chainContractClient.getMainChainInfo.toRight("Can't get main chain info")
lastEcBlock <- httpApiClient.getLastExecutionBlock.leftMap(_.message)
lastEcBlock <- engineApiClient.getLastExecutionBlock.leftMap(_.message)
} yield {
logger.trace(s"Following main chain ${mainChainInfo.id}")
val fullValidationStatus = FullValidationStatus(
Expand Down Expand Up @@ -620,7 +617,7 @@ class ELUpdater(
val finalizedBlock = chainContractClient.getFinalizedBlock
val options = chainContractClient.getOptions
logger.debug(s"Finalized block is ${finalizedBlock.hash}")
httpApiClient.getBlockByHash(finalizedBlock.hash) match {
engineApiClient.getBlockByHash(finalizedBlock.hash) match {
case Left(error) => logger.error(s"Could not load finalized block", error)
case Right(Some(finalizedEcBlock)) =>
logger.trace(s"Finalized block ${finalizedBlock.hash} is at height ${finalizedEcBlock.height}")
Expand Down Expand Up @@ -722,7 +719,7 @@ class ELUpdater(

private def requestBlock(hash: BlockHash): BlockRequestResult = {
logger.debug(s"Requesting block $hash")
httpApiClient.getBlockByHash(hash) match {
engineApiClient.getBlockByHash(hash) match {
case Right(Some(block)) => BlockRequestResult.BlockExists(block)
case Right(None) =>
requestAndProcessBlock(hash)
Expand Down Expand Up @@ -788,7 +785,7 @@ class ELUpdater(
): Option[Working[FollowingChain]] = {
@tailrec
def findLastEcBlock(curBlock: ContractBlock): EcBlock = {
httpApiClient.getBlockByHash(curBlock.hash) match {
engineApiClient.getBlockByHash(curBlock.hash) match {
case Right(Some(block)) => block
case Right(_) =>
chainContractClient.getBlock(curBlock.parentHash) match {
Expand Down Expand Up @@ -877,7 +874,7 @@ class ELUpdater(
private def waitForSyncCompletion(target: ContractBlock): Unit = scheduler.scheduleOnce(5.seconds)(state match {
case SyncingToFinalizedBlock(finalizedBlockHash) if finalizedBlockHash == target.hash =>
logger.debug(s"Checking if EL has synced to ${target.hash} on height ${target.height}")
httpApiClient.getLastExecutionBlock match {
engineApiClient.getLastExecutionBlock match {
case Left(error) =>
logger.error(s"Sync to ${target.hash} was not completed, error=${error.message}")
setState("23", Starting)
Expand Down Expand Up @@ -1209,11 +1206,11 @@ class ELUpdater(
private def mkRollbackBlock(rollbackTargetBlockId: BlockHash): Job[RollbackBlock] = for {
targetBlockFromContract <- Right(chainContractClient.getBlock(rollbackTargetBlockId))
targetBlockOpt <- targetBlockFromContract match {
case None => httpApiClient.getBlockByHash(rollbackTargetBlockId)
case None => engineApiClient.getBlockByHash(rollbackTargetBlockId)
case x => Right(x)
}
targetBlock <- Either.fromOption(targetBlockOpt, ClientError(s"Can't find block $rollbackTargetBlockId neither on a contract, nor in EC"))
parentBlock <- httpApiClient.getBlockByHash(targetBlock.parentHash)
parentBlock <- engineApiClient.getBlockByHash(targetBlock.parentHash)
parentBlock <- Either.fromOption(parentBlock, ClientError(s"Can't find parent block $rollbackTargetBlockId in execution client"))
rollbackBlockOpt <- engineApiClient.applyNewPayload(EmptyL2Block.mkExecutionPayload(parentBlock))
rollbackBlock <- Either.fromOption(rollbackBlockOpt, ClientError("Rollback block hash is not defined as latest valid hash"))
Expand All @@ -1226,7 +1223,7 @@ class ELUpdater(
}

private def getLastWithdrawalIndex(hash: BlockHash): Job[WithdrawalIndex] =
httpApiClient.getBlockByHash(hash).flatMap {
engineApiClient.getBlockByHash(hash).flatMap {
case None => Left(ClientError(s"Can't find $hash block on EC during withdrawal search"))
case Some(ecBlock) =>
ecBlock.withdrawals.lastOption match {
Expand All @@ -1239,7 +1236,7 @@ class ELUpdater(

private def getElToClTransfersRootHash(hash: BlockHash, elBridgeAddress: EthAddress): Job[Digest] =
for {
elRawLogs <- httpApiClient.getLogs(hash, Bridge.ElSentNativeEventTopic)
elRawLogs <- engineApiClient.getLogs(hash, Bridge.ElSentNativeEventTopic)
rootHash <- {
val relatedElRawLogs = elRawLogs.filter(x => x.address == elBridgeAddress && x.topics.contains(Bridge.ElSentNativeEventTopic))
Bridge
Expand Down Expand Up @@ -1413,7 +1410,7 @@ class ELUpdater(
if (fullValidationStatus.validated.contains(lastContractBlock.hash)) Right(BlockForValidation.NotFound)
else if (lastContractBlock.height <= finalizedBlock.height) Right(BlockForValidation.SkippedFinalized(lastContractBlock))
else
httpApiClient
engineApiClient
.getBlockByHash(lastContractBlock.hash)
.map {
case Some(ecBlock) => BlockForValidation.Found(lastContractBlock, ecBlock)
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/units/NetworkL2Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import com.wavesplatform.account.PrivateKey
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.crypto
import com.wavesplatform.crypto.{DigestLength, SignatureLength}
import org.web3j.abi.datatypes.generated.Uint256
import play.api.libs.json.{JsObject, Json}
import units.client.L2BlockLike
import units.client.engine.model.Withdrawal
import units.client.http.model.EcBlock
import units.client.engine.model.{EcBlock, Withdrawal}
import units.eth.EthAddress
import units.util.HexBytesConverter.*
import org.web3j.abi.datatypes.generated.Uint256
import play.api.libs.json.{JsObject, Json}

// TODO Refactor to eliminate a manual deserialization, e.g. (raw: JsonObject, parsed: ParsedBlockL2)
class NetworkL2Block private (
Expand Down
15 changes: 13 additions & 2 deletions src/main/scala/units/client/engine/EngineApiClient.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package units.client.engine

import play.api.libs.json.*
import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.model.*
import units.eth.EthAddress
import units.{BlockHash, Job}
import play.api.libs.json.*

trait EngineApiClient {

def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): Job[String] // TODO Replace String with an appropriate type

def forkChoiceUpdateWithPayloadId(
Expand All @@ -24,6 +23,18 @@ trait EngineApiClient {
def applyNewPayload(payload: JsObject): Job[Option[BlockHash]]

def getPayloadBodyByHash(hash: BlockHash): Job[Option[JsObject]]

def getBlockByNumber(number: BlockNumber): Job[Option[EcBlock]]

def getBlockByHash(hash: BlockHash): Job[Option[EcBlock]]

def getBlockByHashJson(hash: BlockHash, fullTxs: Boolean = false): Job[Option[JsObject]]

def getLastExecutionBlock: Job[EcBlock]

def blockExists(hash: BlockHash): Job[Boolean]

def getLogs(hash: BlockHash, topic: String): Job[List[GetLogsResponseEntry]]
}

object EngineApiClient {
Expand Down
46 changes: 43 additions & 3 deletions src/main/scala/units/client/engine/HttpEngineApiClient.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package units.client.engine

import cats.syntax.either.*
import cats.syntax.traverse.*
import play.api.libs.json.*
import sttp.client3.*
import units.client.JsonRpcClient
import units.client.engine.EngineApiClient.PayloadId
import units.client.engine.HttpEngineApiClient.*
import units.client.engine.model.*
import units.client.engine.model.ForkChoiceUpdatedRequest.ForkChoiceAttributes
import units.eth.EthAddress
import units.{BlockHash, ClientConfig, ClientError, Job}
import play.api.libs.json.*
import sttp.client3.*

import scala.concurrent.duration.{DurationInt, FiniteDuration}

Expand All @@ -17,7 +19,10 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide
val apiUrl = uri"http://${config.executionClientAddress}:${config.engineApiPort}"

def forkChoiceUpdate(blockHash: BlockHash, finalizedBlockHash: BlockHash): Job[String] = {
sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse](ForkChoiceUpdatedRequest(blockHash, finalizedBlockHash, None), BlockExecutionTimeout)
sendEngineRequest[ForkChoiceUpdatedRequest, ForkChoiceUpdatedResponse](
ForkChoiceUpdatedRequest(blockHash, finalizedBlockHash, None),
BlockExecutionTimeout
)
.flatMap {
case ForkChoiceUpdatedResponse(PayloadStatus(status, _, _), None) if status == "SYNCING" || status == "VALID" => Right(status)
case ForkChoiceUpdatedResponse(PayloadStatus(_, _, Some(validationError)), _) =>
Expand Down Expand Up @@ -72,6 +77,41 @@ class HttpEngineApiClient(val config: ClientConfig, val backend: SttpBackend[Ide
.map(_.value.headOption.flatMap(_.asOpt[JsObject]))
}

def getBlockByNumber(number: BlockNumber): Job[Option[EcBlock]] = {
for {
json <- getBlockByNumberJson(number.str)
blockMeta <- json.traverse(parseJson[EcBlock](_))
} yield blockMeta
}

def getBlockByHash(hash: BlockHash): Job[Option[EcBlock]] = {
sendRequest[GetBlockByHashRequest, EcBlock](GetBlockByHashRequest(hash, fullTxs = false))
.leftMap(err => ClientError(s"Error getting block by hash $hash: $err"))
}

def getBlockByHashJson(hash: BlockHash, fullTxs: Boolean = false): Job[Option[JsObject]] = {
sendRequest[GetBlockByHashRequest, JsObject](GetBlockByHashRequest(hash, fullTxs))
.leftMap(err => ClientError(s"Error getting block json by hash $hash: $err"))
}

def getLastExecutionBlock: Job[EcBlock] = for {
lastEcBlockOpt <- getBlockByNumber(BlockNumber.Latest)
lastEcBlock <- Either.fromOption(lastEcBlockOpt, ClientError("Impossible: EC doesn't have blocks"))
} yield lastEcBlock

def blockExists(hash: BlockHash): Job[Boolean] =
getBlockByHash(hash).map(_.isDefined)

private def getBlockByNumberJson(number: String): Job[Option[JsObject]] = {
sendRequest[GetBlockByNumberRequest, JsObject](GetBlockByNumberRequest(number))
.leftMap(err => ClientError(s"Error getting block by number $number: $err"))
}

override def getLogs(hash: BlockHash, topic: String): Job[List[GetLogsResponseEntry]] =
sendRequest[GetLogsRequest, List[GetLogsResponseEntry]](GetLogsRequest(hash, List(topic)))
.leftMap(err => ClientError(s"Error getting block logs by hash $hash: $err"))
.map(_.getOrElse(List.empty))

private def sendEngineRequest[A: Writes, B: Reads](request: A, timeout: FiniteDuration): Job[B] = {
sendRequest(request, timeout) match {
case Right(response) => response.toRight(ClientError(s"Unexpected engine API empty response"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package units.client.http.model
package units.client.engine.model

import units.util.HexBytesConverter

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package units.client.http.model
package units.client.engine.model

import units.BlockHash
import units.client.L2BlockLike
import units.client.engine.model.Withdrawal
import units.eth.EthAddress
import units.util.HexBytesConverter.*
import org.web3j.abi.datatypes.generated.Uint256
import play.api.libs.functional.syntax.*
import play.api.libs.json.*
import play.api.libs.json.Format.GenericFormat
import units.BlockHash
import units.client.L2BlockLike
import units.eth.EthAddress
import units.util.HexBytesConverter.*

/** Block in EC API, not a payload of Engine API! See BlockHeader in besu.
* @param timestamp
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package units.client.http.model
package units.client.engine.model

import units.BlockHash
import play.api.libs.json.{Json, Writes}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package units.client.http.model
package units.client.engine.model

import play.api.libs.json.{Json, Writes}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package units.client.http.model
package units.client.engine.model

import units.BlockHash
import play.api.libs.json.{Json, Writes}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package units.client.http.model
package units.client.engine.model

import units.eth.EthAddress
import play.api.libs.json.{Json, Reads}
Expand Down
19 changes: 0 additions & 19 deletions src/main/scala/units/client/http/EcApiClient.scala

This file was deleted.

Loading

0 comments on commit fc21d01

Please sign in to comment.