Skip to content

Commit

Permalink
Feature/syncing node (#1648)
Browse files Browse the repository at this point in the history
* Syncing node

Signed-off-by: iceseer <[email protected]>
Signed-off-by: Alexander Lednev <[email protected]>

* YAC removed

Signed-off-by: iceseer <[email protected]>
Signed-off-by: Alexander Lednev <[email protected]>

* RocksDB store sync peers

Signed-off-by: iceseer <[email protected]>
Signed-off-by: Alexander Lednev <[email protected]>

* Peers refactoring

Signed-off-by: iceseer <[email protected]>
Signed-off-by: Alexander Lednev <[email protected]>

* Ledger state

Signed-off-by: iceseer <[email protected]>
Signed-off-by: Alexander Lednev <[email protected]>
  • Loading branch information
iceseer committed Dec 29, 2021
1 parent 0fc3c5d commit 653c61f
Show file tree
Hide file tree
Showing 71 changed files with 732 additions and 271 deletions.
23 changes: 16 additions & 7 deletions irohad/ametsuchi/impl/mutable_storage_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,22 @@ namespace iroha::ametsuchi {
block_storage_->insert(block);
block_index_->index(*block);

auto opt_ledger_peers = peer_query_->getLedgerPeers();
if (not opt_ledger_peers) {
log_->error("Failed to get ledger peers!");
return false;
}
boost::optional<
std::vector<std::shared_ptr<shared_model::interface::Peer>>>
opt_ledger_peers[] = {peer_query_->getLedgerPeers(false),
peer_query_->getLedgerPeers(true)};

for (auto const &peer_list : opt_ledger_peers)
if (!peer_list) {
log_->error("Failed to get ledger peers!");
return false;
}

ledger_state_ = std::make_shared<const LedgerState>(
std::move(*opt_ledger_peers), block->height(), block->hash());
std::move(*(opt_ledger_peers[0])), // peers
std::move(*(opt_ledger_peers[1])), // syncing peers
block->height(),
block->hash());
}

return block_applied;
Expand Down Expand Up @@ -161,7 +169,8 @@ namespace iroha::ametsuchi {
try {
db_tx_.rollback();
} catch (std::exception &e) {
log_->warn("~MutableStorageImpl(): rollback failed. Reason: {}", e.what());
log_->warn("~MutableStorageImpl(): rollback failed. Reason: {}",
e.what());
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions irohad/ametsuchi/impl/peer_query_wsv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ namespace iroha {
PeerQueryWsv::PeerQueryWsv(std::shared_ptr<WsvQuery> wsv)
: wsv_(std::move(wsv)) {}

boost::optional<std::vector<PeerQuery::wPeer>>
PeerQueryWsv::getLedgerPeers() {
return wsv_->getPeers();
boost::optional<std::vector<PeerQuery::wPeer>> PeerQueryWsv::getLedgerPeers(
bool syncing_peers) {
return wsv_->getPeers(syncing_peers);
}

boost::optional<PeerQuery::wPeer> PeerQueryWsv::getLedgerPeerByPublicKey(
Expand Down
3 changes: 2 additions & 1 deletion irohad/ametsuchi/impl/peer_query_wsv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace iroha {
* Fetch peers stored in ledger
* @return list of peers in insertion to ledger order
*/
boost::optional<std::vector<wPeer>> getLedgerPeers() override;
boost::optional<std::vector<wPeer>> getLedgerPeers(
bool syncing_peers) override;

/**
* Fetch peer with given public key from ledger
Expand Down
72 changes: 70 additions & 2 deletions irohad/ametsuchi/impl/postgres_command_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,26 @@ namespace iroha {
AND NOT (SELECT * FROM has_root_perm) THEN 2
WHEN NOT (SELECT * FROM has_perm) THEN 2)"});

add_sync_peer_statements_ = makeCommandStatements(
sql_,
R"(
WITH %s
inserted AS (
INSERT INTO sync_peer(public_key, address, tls_certificate)
(
SELECT lower(:pubkey), :address, :tls_certificate
%s
) RETURNING (1)
)
SELECT CASE WHEN EXISTS (SELECT * FROM inserted) THEN 0
%s
ELSE 1 END AS result)",
{(boost::format(R"(has_perm AS (%s),)")
% checkAccountRolePermission(Role::kAddPeer, ":creator"))
.str(),
"WHERE (SELECT * FROM has_perm)",
"WHEN NOT (SELECT * from has_perm) THEN 2"});

compare_and_set_account_detail_statements_ = makeCommandStatements(
sql_,
R"(
Expand Down Expand Up @@ -1176,6 +1196,40 @@ namespace iroha {
R"( AND (SELECT * FROM has_perm))",
R"( WHEN NOT (SELECT * FROM has_perm) THEN 2 )"});

remove_sync_peer_statements_ = makeCommandStatements(
sql_,
R"(
WITH %s
removed AS (
DELETE FROM sync_peer WHERE public_key = lower(:pubkey)
%s
RETURNING (1)
)
SELECT CASE
WHEN EXISTS (SELECT * FROM removed) THEN 0
%s
ELSE 1
END AS result)",
{(boost::format(R"(
has_perm AS (%s),
get_peer AS (
SELECT * from sync_peer WHERE public_key = lower(:pubkey) LIMIT 1
),
check_peers AS (
SELECT 1 WHERE (SELECT COUNT(*) FROM sync_peer) > 0
),)")
% checkAccountRolePermission(
Role::kAddPeer, Role::kRemovePeer, ":creator"))
.str(),
R"(
AND (SELECT * FROM has_perm)
AND EXISTS (SELECT * FROM get_peer)
AND EXISTS (SELECT * FROM check_peers))",
R"(
WHEN NOT EXISTS (SELECT * from get_peer) THEN 3
WHEN NOT EXISTS (SELECT * from check_peers) THEN 4
WHEN NOT (SELECT * from has_perm) THEN 2)"});

set_quorum_statements_ = makeCommandStatements(
sql_,
R"(
Expand Down Expand Up @@ -1499,8 +1553,12 @@ namespace iroha {
bool do_validation) {
auto &peer = command.peer();

StatementExecutor executor(
add_peer_statements_, do_validation, "AddPeer", perm_converter_);
StatementExecutor executor(peer.isSyncingPeer()
? add_sync_peer_statements_
: add_peer_statements_,
do_validation,
"AddPeer",
perm_converter_);
executor.use("creator", creator_account_id);
executor.use("address", peer.address());
executor.use("pubkey", peer.pubkey());
Expand Down Expand Up @@ -1795,6 +1853,16 @@ namespace iroha {
bool do_validation) {
auto pubkey = command.pubkey();

{
StatementExecutor executor(remove_sync_peer_statements_,
do_validation,
"RemovePeer",
perm_converter_);
executor.use("creator", creator_account_id);
executor.use("pubkey", pubkey);
executor.execute();
}

StatementExecutor executor(remove_peer_statements_,
do_validation,
"RemovePeer",
Expand Down
2 changes: 2 additions & 0 deletions irohad/ametsuchi/impl/postgres_command_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ namespace iroha {

std::unique_ptr<CommandStatements> add_asset_quantity_statements_;
std::unique_ptr<CommandStatements> add_peer_statements_;
std::unique_ptr<CommandStatements> add_sync_peer_statements_;
std::unique_ptr<CommandStatements> add_signatory_statements_;
std::unique_ptr<CommandStatements> append_role_statements_;
std::unique_ptr<CommandStatements>
Expand All @@ -266,6 +267,7 @@ namespace iroha {
std::unique_ptr<CommandStatements> detach_role_statements_;
std::unique_ptr<CommandStatements> grant_permission_statements_;
std::unique_ptr<CommandStatements> remove_peer_statements_;
std::unique_ptr<CommandStatements> remove_sync_peer_statements_;
std::unique_ptr<CommandStatements> remove_signatory_statements_;
std::unique_ptr<CommandStatements> revoke_permission_statements_;
std::unique_ptr<CommandStatements> set_account_detail_statements_;
Expand Down
13 changes: 7 additions & 6 deletions irohad/ametsuchi/impl/postgres_specific_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,9 @@ namespace iroha {
R"(WITH has_perms AS ({})
SELECT public_key, address, tls_certificate, perm FROM peer
RIGHT OUTER JOIN has_perms ON TRUE
UNION
SELECT public_key, address, tls_certificate, perm FROM sync_peer
RIGHT OUTER JOIN has_perms ON TRUE
)",
getAccountRolePermissionCheckSql(Role::kGetPeers));

Expand All @@ -1500,12 +1503,10 @@ namespace iroha {
if (peer_key and address) {
peers.push_back(
std::make_shared<shared_model::plain::Peer>(
*address, *std::move(peer_key), tls_certificate));
} else {
log_->error(
"Address or public key not set for some peer!");
assert(peer_key);
assert(address);
*address,
*std::move(peer_key),
tls_certificate,
false));
}
});
}
Expand Down
14 changes: 10 additions & 4 deletions irohad/ametsuchi/impl/postgres_wsv_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,10 @@ namespace iroha {
WsvCommandResult PostgresWsvCommand::insertPeer(
const shared_model::interface::Peer &peer) {
soci::statement st = sql_.prepare
<< "INSERT INTO peer(public_key, address, tls_certificate)"
" VALUES (lower(:pk), :address, :tls_certificate)";
<< fmt::format("INSERT INTO {}(public_key, address, "
"tls_certificate) VALUES (lower(:pk), :address, "
":tls_certificate)",
peer.isSyncingPeer() ? "sync_peer" : "peer");
st.exchange(soci::use(peer.pubkey()));
st.exchange(soci::use(peer.address()));
st.exchange(soci::use(peer.tlsCertificate()));
Expand All @@ -329,8 +331,12 @@ namespace iroha {
WsvCommandResult PostgresWsvCommand::deletePeer(
const shared_model::interface::Peer &peer) {
soci::statement st = sql_.prepare
<< "DELETE FROM peer WHERE public_key = lower(:pk) AND address = "
":address";
<< fmt::format("DELETE FROM {} WHERE public_key = "
"lower(:pk) AND address = :address",
peer.isSyncingPeer() ? "sync_peer" : "peer"

);

st.exchange(soci::use(peer.pubkey()));
st.exchange(soci::use(peer.address()));

Expand Down
34 changes: 20 additions & 14 deletions irohad/ametsuchi/impl/postgres_wsv_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
namespace {
template <typename T>
boost::optional<std::vector<std::shared_ptr<shared_model::interface::Peer>>>
getPeersFromSociRowSet(T &&rowset) {
getPeersFromSociRowSet(T &&rowset, bool syncing_peer) {
return iroha::ametsuchi::flatMapValues<
std::vector<std::shared_ptr<shared_model::interface::Peer>>>(
std::forward<T>(rowset),
[&](auto &public_key, auto &address, auto &tls_certificate) {
return boost::make_optional(
std::make_shared<shared_model::plain::Peer>(
address, std::move(public_key), tls_certificate));
std::make_shared<shared_model::plain::Peer>(address,
std::move(public_key),
tls_certificate,
syncing_peer));
});
}
} // namespace
Expand Down Expand Up @@ -69,15 +71,19 @@ namespace iroha {
}

boost::optional<std::vector<std::shared_ptr<shared_model::interface::Peer>>>
PostgresWsvQuery::getPeers() {
PostgresWsvQuery::getPeers(bool syncing_peers) {
using T = boost::
tuple<std::string, AddressType, std::optional<TLSCertificateType>>;
auto result = execute<T>([&] {
return (sql_.prepare
<< "SELECT public_key, address, tls_certificate FROM peer");
return (
sql_.prepare
<< (syncing_peers
? "SELECT public_key, address, tls_certificate FROM "
"sync_peer"
: "SELECT public_key, address, tls_certificate FROM peer"));
});

return getPeersFromSociRowSet(result);
return getPeersFromSociRowSet(result, syncing_peers);
}

iroha::expected::Result<size_t, std::string> PostgresWsvQuery::count(
Expand All @@ -92,9 +98,9 @@ namespace iroha {
return iroha::expected::makeError(msg);
}

iroha::expected::Result<size_t, std::string>
PostgresWsvQuery::countPeers() {
return count("peer");
iroha::expected::Result<size_t, std::string> PostgresWsvQuery::countPeers(
bool syncing_peers) {
return count(syncing_peers ? "sync_peer" : "peer");
}

iroha::expected::Result<size_t, std::string>
Expand All @@ -117,13 +123,13 @@ namespace iroha {
std::string target_public_key{public_key};
auto result = execute<T>([&] {
return (sql_.prepare << R"(
SELECT public_key, address, tls_certificate
FROM peer
WHERE public_key = :public_key)",
SELECT public_key, address, tls_certificate FROM peer WHERE public_key = :public_key
UNION
SELECT public_key, address, tls_certificate FROM sync_peer WHERE public_key = :public_key)",
soci::use(target_public_key, "public_key"));
});

return getPeersFromSociRowSet(result) | [](auto &&peers)
return getPeersFromSociRowSet(result, false) | [](auto &&peers)
-> boost::optional<
std::shared_ptr<shared_model::interface::Peer>> {
if (!peers.empty()) {
Expand Down
5 changes: 3 additions & 2 deletions irohad/ametsuchi/impl/postgres_wsv_query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ namespace iroha {

boost::optional<
std::vector<std::shared_ptr<shared_model::interface::Peer>>>
getPeers() override;
getPeers(bool syncing_peers) override;

iroha::expected::Result<size_t, std::string> countPeers() override;
iroha::expected::Result<size_t, std::string> countPeers(
bool syncing_peers) override;
iroha::expected::Result<size_t, std::string> countDomains() override;
iroha::expected::Result<size_t, std::string> countTransactions() override;

Expand Down
Loading

0 comments on commit 653c61f

Please sign in to comment.