Skip to content

Commit

Permalink
Merge remote-tracking branch 'spring/main' into GH-6-proposer-policy-2
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed May 1, 2024
2 parents 887b057 + 5a3550a commit af51f07
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 24 deletions.
4 changes: 2 additions & 2 deletions .cicd/defaults.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"cdt":{
"target":"hotstuff_integration",
"prerelease":false
"target":"main",
"prerelease":true
},
"referencecontracts":{
"ref":"main"
Expand Down
38 changes: 27 additions & 11 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3220,7 +3220,13 @@ struct controller_impl {
}

if ( s == controller::block_status::incomplete || s == controller::block_status::complete || s == controller::block_status::validated ) {
apply_s<void>(chain_head, [&](const auto& head) { create_and_send_vote_msg(head); });
if (!my_finalizers.empty()) {
apply_s<void>(chain_head, [&](const auto& head) {
boost::asio::post(thread_pool.get_executor(), [this, head=head]() {
create_and_send_vote_msg(head);
});
});
}
}

if (auto* dm_logger = get_deep_mind_logger(false)) {
Expand Down Expand Up @@ -3434,7 +3440,7 @@ struct controller_impl {
// not have been validated and we could not vote. At this point bsp->final_on_strong_qc_block_ref has been validated and we can vote.
// Only need to consider voting if not already validated, if already validated then we have already voted.
if (!already_valid)
consider_voting(bsp);
consider_voting(bsp, use_thread_pool_t::yes);

const signed_block_ptr& b = bsp->block;
const auto& new_protocol_feature_activations = bsp->get_new_protocol_feature_activations();
Expand Down Expand Up @@ -3788,7 +3794,7 @@ struct controller_impl {

if constexpr (savanna_mode) {
integrate_received_qc_to_block(bsp); // Save the received QC as soon as possible, no matter whether the block itself is valid or not
consider_voting(bsp);
consider_voting(bsp, use_thread_pool_t::no);
}

if (conf.terminate_at_block == 0 || bsp->block_num() <= conf.terminate_at_block) {
Expand Down Expand Up @@ -3895,19 +3901,29 @@ struct controller_impl {
}
}

void consider_voting(const block_state_legacy_ptr&) {}

enum class use_thread_pool_t { no, yes };
void consider_voting(const block_state_legacy_ptr&, use_thread_pool_t) {}
// thread safe
void consider_voting(const block_state_ptr& bsp) {
void consider_voting(const block_state_ptr& bsp, use_thread_pool_t use_thread_pool) {
// 1. Get the `core.final_on_strong_qc_block_num` for the block you are considering to vote on and use that to find the actual block ID
// of the ancestor block that has that block number.
// 2. If that block ID is for a non validated block, then do not vote for that block.
// 3. Otherwise, consider voting for that block according to the decide_vote rules.

if (bsp->core.final_on_strong_qc_block_num > 0) {
const auto& final_on_strong_qc_block_ref = bsp->core.get_block_reference(bsp->core.final_on_strong_qc_block_num);
if (fork_db_validated_block_exists(final_on_strong_qc_block_ref.block_id)) {
create_and_send_vote_msg(bsp);
if (!my_finalizers.empty() && bsp->core.final_on_strong_qc_block_num > 0) {
if (use_thread_pool == use_thread_pool_t::yes) {
boost::asio::post(thread_pool.get_executor(), [this, bsp=bsp]() {
const auto& final_on_strong_qc_block_ref = bsp->core.get_block_reference(bsp->core.final_on_strong_qc_block_num);
if (fork_db_validated_block_exists(final_on_strong_qc_block_ref.block_id)) {
create_and_send_vote_msg(bsp);
}
});
} else {
// bsp can be used directly instead of copy needed for post
const auto& final_on_strong_qc_block_ref = bsp->core.get_block_reference(bsp->core.final_on_strong_qc_block_num);
if (fork_db_validated_block_exists(final_on_strong_qc_block_ref.block_id)) {
create_and_send_vote_msg(bsp);
}
}
}
}
Expand All @@ -3917,7 +3933,7 @@ struct controller_impl {
assert(bsp && bsp->block);

// consider voting again as final_on_strong_qc_block may have been validated since the bsp was created in create_block_state_i
consider_voting(bsp);
consider_voting(bsp, use_thread_pool_t::yes);

auto do_accept_block = [&](auto& forkdb) {
if constexpr (std::is_same_v<BSP, typename std::decay_t<decltype(forkdb.head())>>)
Expand Down
1 change: 1 addition & 0 deletions libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ namespace eosio::testing {
if (itr == last_produced_block.end() || b->block_num() > block_header::num_from_id(itr->second)) {
last_produced_block[b->producer] = b->calculate_id();
}
_wait_for_vote_if_needed(*control);
}

signed_block_ptr base_tester::_produce_block( fc::microseconds skip_time, bool skip_pending_trxs ) {
Expand Down
4 changes: 4 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/nodeos_high_transaction_test.py ${CMA
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/nodeos_retry_transaction_test.py ${CMAKE_CURRENT_BINARY_DIR}/nodeos_retry_transaction_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/transition_to_if.py ${CMAKE_CURRENT_BINARY_DIR}/transition_to_if.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/disaster_recovery.py ${CMAKE_CURRENT_BINARY_DIR}/disaster_recovery.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/disaster_recovery_2.py ${CMAKE_CURRENT_BINARY_DIR}/disaster_recovery_2.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/disaster_recovery_2_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/disaster_recovery_2_test_shape.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/trx_finality_status_test.py ${CMAKE_CURRENT_BINARY_DIR}/trx_finality_status_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/trx_finality_status_forked_test.py ${CMAKE_CURRENT_BINARY_DIR}/trx_finality_status_forked_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/plugin_http_api_test.py ${CMAKE_CURRENT_BINARY_DIR}/plugin_http_api_test.py COPYONLY)
Expand Down Expand Up @@ -149,6 +151,8 @@ set_property(TEST transition_to_if_lr PROPERTY LABELS long_running_tests)

add_test(NAME disaster_recovery COMMAND tests/disaster_recovery.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST disaster_recovery PROPERTY LABELS nonparallelizable_tests)
add_test(NAME disaster_recovery_2 COMMAND tests/disaster_recovery_2.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST disaster_recovery_2 PROPERTY LABELS nonparallelizable_tests)

add_test(NAME ship_test COMMAND tests/ship_test.py -v --num-clients 10 --num-requests 5000 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_test PROPERTY LABELS nonparallelizable_tests)
Expand Down
25 changes: 15 additions & 10 deletions tests/disaster_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
from TestHarness.Node import BlockType

###############################################################
# disaster_recovery
# disaster_recovery - Scenario 1
#
# Verify that if one node in network has locked blocks then consensus can continue.
#
# Integration test with 4 finalizers (A, B, C, and D).
#
# The 4 nodes are cleanly shutdown in the following state:
# - A has LIB N. A has a finalizer safety information file that locks on a block after N.
# - B, C, and D have LIB less than N. They have finalizer safety information files that lock on N.
# - B, C, and D have LIB less than or same as N. They have finalizer safety information files that lock on N
# or a block after N.
#
# All nodes lose their reversible blocks and restart from an earlier snapshot.
# Nodes B, C, and D lose their reversible blocks. All nodes restart from an earlier snapshot.
#
# A is restarted and replays up to block N after restarting from snapshot. Block N is sent to the other
# nodes B, C, and D after they are also started up again.
# A is restarted and replays up to its last reversible block (which is a block number greater than N) after
# restarting from snapshot. Blocks N and later is sent to the other nodes B, C, and D after they are also
# started up again.
#
# Verify that LIB advances and that A, B, C, and D are eventually voting strong on new blocks.
#
Expand Down Expand Up @@ -70,8 +74,8 @@
Print(f"Snapshot head block number {ret_head_block_num}")

Print("Wait for snapshot node lib to advance")
node0.waitForBlock(ret_head_block_num+1, blockType=BlockType.lib)
assert node1.waitForLibToAdvance(), "Ndoe1 did not advance LIB after snapshot of Node0"
assert node0.waitForBlock(ret_head_block_num+1, blockType=BlockType.lib), "Node0 did not advance to make snapshot block LIB"
assert node1.waitForLibToAdvance(), "Node1 did not advance LIB after snapshot of Node0"

assert node0.waitForLibToAdvance(), "Node0 did not advance LIB after snapshot"
currentLIB = node0.getIrreversibleBlockNum()
Expand All @@ -82,9 +86,10 @@
for node in [node1, node2, node3]:
assert not node.verifyAlive(), "Node did not shutdown"

# node0 will have higher lib than 1,2,3 since it can incorporate QCs in blocks
Print("Wait for node 0 LIB to advance")
assert node0.waitForBlock(currentLIB, blockType=BlockType.lib), "Node0 did not advance LIB" # uses getBlockNum(blockType=blockType) > blockNum
# node0 is likely to have higher lib than 1,2,3 since it can incorporate QCs in blocks
Print("Wait for node 0 to advance")
# 4 producers, 3 of which are not producing, wait for 4 rounds to make sure node0 defproducera has time to produce
assert node0.waitForHeadToAdvance(blocksToAdvance=2, timeout=4*6), "Node0 did not advance"
node0.kill(signal.SIGTERM)
assert not node0.verifyAlive(), "Node0 did not shutdown"

Expand Down
149 changes: 149 additions & 0 deletions tests/disaster_recovery_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#!/usr/bin/env python3
import os
import shutil
import signal
import time
from TestHarness import Cluster, TestHelper, Utils, WalletMgr
from TestHarness.Node import BlockType

###############################################################
# disaster_recovery - Scenario 2
#
# Verify that if finalizers are only locked on LIB blocks then all reversable blocks in the network can be lost
# and consensus can continue.
#
# Integration test with 5 nodes (A, B, C, D, and P). Nodes A, B, C, and D each have one finalizer but no proposers.
# Node P has a proposer but no finalizers. The finalizer policy consists of the four finalizers with a threshold of 3.
# The proposer policy involves just the single proposer P.
#
# A, B, C, and D can be connected to each other however we like as long as blocks sent to node A can traverse to the
# other nodes B, C, and D. However, node P should only be connected to node A.
#
# At some point after IF transition has completed and LIB is advancing, block production on node P should be paused.
# Enough time should be given to allow and in-flight votes on the latest produced blocks to be delivered to node P.
# Then, the connection between node P and node A should be severed, and then block production on node P resumed. The
# LIB on node P should advance to but then stall at block N. Then shortly after that, node P should be cleanly shut down.
#
# Verify that the LIB on A, B, C, and D has stalled and is less than block N. Then, nodes A, B, C, and D can all be
# cleanly shut down.
#
# Then, reversible blocks from all nodes should be removed. All nodes are restarted from an earlier
# snapshot (prior to block N).
#
# P is restarted and replays up to block N after restarting from snapshot. Blocks up to and including block N are sent
# to the other nodes A, B, C, and D after they are also started up again.
#
# Verify that LIB advances and that A, B, C, and D are eventually voting strong on new blocks.
###############################################################

Print=Utils.Print
errorExit=Utils.errorExit

args=TestHelper.parse_args({"-d","--keep-logs","--dump-error-details","-v","--leave-running","--unshared"})
pnodes=1
delay=args.d
debug=args.v
prod_count = 1 # per node prod count
total_nodes=pnodes+4
dumpErrorDetails=args.dump_error_details

Utils.Debug=debug
testSuccessful=False

cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
walletMgr=WalletMgr(True, keepRunning=args.leave_running, keepLogs=args.keep_logs)

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)

Print(f'producing nodes: {pnodes}, delay between nodes launch: {delay} second{"s" if delay != 1 else ""}')

Print("Stand up cluster")
specificExtraNodeosArgs={}
specificExtraNodeosArgs[0]="--plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "

if cluster.launch(pnodes=pnodes, totalNodes=total_nodes, totalProducers=pnodes, specificExtraNodeosArgs=specificExtraNodeosArgs,
topo="./tests/disaster_recovery_2_test_shape.json", delay=delay, loadSystemContract=False,
activateIF=True, signatureProviderForNonProducer=True) is False:
errorExit("Failed to stand up eos cluster.")

assert cluster.biosNode.getInfo(exitOnError=True)["head_block_producer"] != "eosio", "launch should have waited for production to change"

cluster.biosNode.kill(signal.SIGTERM)
cluster.waitOnClusterSync(blockAdvancing=5)

node0 = cluster.getNode(0) # P
node1 = cluster.getNode(1) # A
node2 = cluster.getNode(2) # B
node3 = cluster.getNode(3) # C
node4 = cluster.getNode(4) # D

Print("Create snapshot (node 0)")
ret = node0.createSnapshot()
assert ret is not None, "Snapshot creation failed"
ret_head_block_num = ret["payload"]["head_block_num"]
Print(f"Snapshot head block number {ret_head_block_num}")

Print("Wait for snapshot node lib to advance")
assert node0.waitForBlock(ret_head_block_num+1, blockType=BlockType.lib), "Node0 did not advance to make snapshot block LIB"
assert node1.waitForLibToAdvance(), "Node1 did not advance LIB after snapshot of Node0"

assert node0.waitForLibToAdvance(), "Node0 did not advance LIB after snapshot"

Print("Pause production on Node0")
# loop until we have a lib advance after pause, pause may happen between blocks, need current block to be produced
retrys = 10
while retrys > 0:
lib = node0.getIrreversibleBlockNum()
node0.processUrllibRequest("producer", "pause")
# wait for lib because waitForBlock uses > not >=
if node0.waitForBlock(lib, blockType=BlockType.lib, timeout=10):
break
node0.processUrllibRequest("producer", "resume")
time.sleep(0.25)
retrys -= 1
assert retrys > 0, "Node0 did not advance LIB after pause"
time.sleep(1)

Print("Disconnect the producing node (Node0) from peer Node1")
node0.processUrllibRequest("net", "disconnect", "localhost:9877")
assert not node0.waitForLibToAdvance(timeout=10), "Node0 LIB still advancing after disconnect"

Print("Resume production on Node0")
node0.processUrllibRequest("producer", "resume")
assert node0.waitForHeadToAdvance(blocksToAdvance=2)
libN = node0.getIrreversibleBlockNum()

assert not node1.waitForHeadToAdvance(timeout=5), "Node1 head still advancing after disconnect"

for node in [node1, node2, node3, node4]:
lib = node.getIrreversibleBlockNum()
assert lib < libN, "Node LIB {lib} >= LIB N {libN}"

for node in [node0, node1, node2, node3, node4]:
node.kill(signal.SIGTERM)

for node in [node0, node1, node2, node3, node4]:
assert not node.verifyAlive(), "Node did not shutdown"

for node in [node0, node1, node2, node3, node4]:
node.removeReversibleBlks()
node.removeState()

for i in range(5):
isRelaunchSuccess = cluster.getNode(i).relaunch(chainArg=" -e --snapshot {}".format(node0.getLatestSnapshot()))
assert isRelaunchSuccess, f"node {i} relaunch from snapshot failed"

for node in [node0, node1, node2, node3, node4]:
assert node.waitForLibToAdvance(), "Node did not advance LIB after relaunch"
lib = node.getIrreversibleBlockNum()
assert lib > libN, "Node LIB {lib} <= LIB N {libN}"

testSuccessful=True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)

exitCode = 0 if testSuccessful else 1
exit(exitCode)
Loading

0 comments on commit af51f07

Please sign in to comment.