From a538a67489d5d283f5a5cc95abc982259515ffd2 Mon Sep 17 00:00:00 2001 From: 0x009922 <43530070+0x009922@users.noreply.github.com> Date: Tue, 11 Jun 2024 10:24:32 +0900 Subject: [PATCH] refactor: supervise spawned tasks Signed-off-by: 0x009922 <43530070+0x009922@users.noreply.github.com> --- Cargo.lock | 186 ++-- Cargo.toml | 1 + cli/Cargo.toml | 7 +- cli/src/lib.rs | 467 ++++------ cli/src/main.rs | 23 +- .../extra_functional/restart_peer.rs | 2 +- core/Cargo.toml | 1 - core/benches/blocks/common.rs | 2 +- core/benches/kura.rs | 5 +- core/benches/validation.rs | 4 +- core/src/block.rs | 8 +- core/src/block_sync.rs | 19 +- core/src/gossiper.rs | 22 +- core/src/kiso.rs | 19 +- core/src/kura.rs | 36 +- core/src/lib.rs | 37 - core/src/query/store.rs | 68 +- core/src/queue.rs | 22 +- core/src/smartcontracts/isi/mod.rs | 2 +- core/src/smartcontracts/isi/query.rs | 10 +- core/src/smartcontracts/wasm.rs | 12 +- core/src/snapshot.rs | 42 +- core/src/state.rs | 4 +- core/src/sumeragi/main_loop.rs | 45 +- core/src/sumeragi/mod.rs | 59 +- core/test_network/Cargo.toml | 3 +- core/test_network/src/lib.rs | 44 +- futures/Cargo.toml | 5 +- futures/src/lib.rs | 3 + futures/src/supervisor.rs | 843 ++++++++++++++++++ logger/src/lib.rs | 8 +- p2p/Cargo.toml | 1 + p2p/src/network.rs | 24 +- p2p/tests/integration/p2p.rs | 8 +- telemetry/src/dev.rs | 5 +- torii/src/lib.rs | 63 +- 36 files changed, 1388 insertions(+), 722 deletions(-) create mode 100644 futures/src/supervisor.rs diff --git a/Cargo.lock b/Cargo.lock index 0284f24a6be..c1c9f6d91fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,9 +131,9 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" dependencies = [ "windows-sys 0.52.0", ] @@ -518,9 +518,9 @@ dependencies = [ [[package]] name = "borsh" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbe5b10e214954177fb1dc9fbd20a1a2608fe99e6c832033bdc7cea287a20d77" +checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed" dependencies = [ "borsh-derive", "cfg_aliases", @@ -528,9 +528,9 @@ dependencies = [ [[package]] name = "borsh-derive" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a8646f94ab393e43e8b35a2558b1624bed28b97ee09c5d15456e3c9463f46d" +checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", "proc-macro-crate", @@ -657,9 +657,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.97" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099a5357d84c4c61eb35fc8eafa9a79a902c2f76911e5747ced4e032edd8d9b4" +checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" dependencies = [ "jobserver", "libc", @@ -674,9 +674,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "cfg_aliases" -version = "0.1.1" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chacha20" @@ -757,9 +757,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.4" +version = "4.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" +checksum = "a9689a29b593160de5bc4aacab7b5d54fb52231de70122626c178e6a368994c7" dependencies = [ "clap_builder", "clap_derive", @@ -767,9 +767,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.2" +version = "4.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +checksum = "2e5387378c84f6faa26890ebf9f0a92989f8873d4d380467bcd0d8d8620424df" dependencies = [ "anstream", "anstyle", @@ -779,9 +779,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.4" +version = "4.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" +checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -791,9 +791,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" [[package]] name = "clru" @@ -856,9 +856,9 @@ dependencies = [ [[package]] name = "concread" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4539869aeea73afd414cc1750eceada0d042764f2d28873d74fbbd81610bffe" +checksum = "23bef63c371d1b3da7e61e7b72e5757f070131a399f2eb60edc2d8bb8102249a" dependencies = [ "ahash 0.8.11", "arc-swap", @@ -1084,9 +1084,9 @@ dependencies = [ [[package]] name = "crc32fast" -version = "1.4.0" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" dependencies = [ "cfg-if", ] @@ -1129,9 +1129,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" dependencies = [ "crossbeam-utils", ] @@ -1166,9 +1166,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.19" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "crossterm" @@ -1253,9 +1253,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb497fad022245b29c2a0351df572e2d67c1046bcef2260ebc022aec81efea82" +checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c" dependencies = [ "cc", "cxxbridge-flags", @@ -1265,9 +1265,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9327c7f9fbd6329a200a5d4aa6f674c60ab256525ff0084b52a889d4e4c60cee" +checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501" dependencies = [ "cc", "codespan-reporting", @@ -1280,15 +1280,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "688c799a4a846f1c0acb9f36bb9c6272d9b3d9457f3633c7753c6057270df13c" +checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1" [[package]] name = "cxxbridge-macro" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928bc249a7e3cd554fd2e8e08a426e9670c50bbfc9a621653cfa9accc9641783" +checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140" dependencies = [ "proc-macro2", "quote", @@ -1536,9 +1536,9 @@ dependencies = [ [[package]] name = "either" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" [[package]] name = "elliptic-curve" @@ -2161,9 +2161,9 @@ dependencies = [ [[package]] name = "gix-macros" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dff438f14e67e7713ab9332f5fd18c8f20eb7eb249494f6c2bf170522224032" +checksum = "999ce923619f88194171a67fb3e6d613653b8d4d6078b529b15a765da0edcc17" dependencies = [ "proc-macro2", "quote", @@ -2393,9 +2393,9 @@ dependencies = [ [[package]] name = "gix-validate" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e39fc6e06044985eac19dd34d474909e517307582e462b2eb4c8fa51b6241545" +checksum = "82c27dd34a49b1addf193c92070bcbf3beaf6e10f16a78544de6372e146a0acf" dependencies = [ "bstr", "thiserror", @@ -2634,9 +2634,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" dependencies = [ "bytes", "futures-channel", @@ -3137,8 +3137,11 @@ dependencies = [ "rand", "serde", "serde_json", + "thiserror", + "thread-local-panic-hook", "tokio", "tokio-stream", + "tokio-util", ] [[package]] @@ -3237,6 +3240,7 @@ dependencies = [ "iroha_crypto", "iroha_data_model", "iroha_data_model_derive", + "iroha_futures", "iroha_logger", "iroha_primitives", "parity-scale-codec", @@ -3582,8 +3586,8 @@ dependencies = [ "supports-color 2.1.0", "tempfile", "thiserror", - "thread-local-panic-hook", "tokio", + "tokio-util", "toml 0.8.14", "tracing", "vergen", @@ -3750,9 +3754,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.154" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libflate" @@ -3816,9 +3820,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "lock_api" @@ -3962,9 +3966,9 @@ checksum = "933dca44d65cdd53b355d0b73d380a2ff5da71f87f036053188bf1eab6a19881" [[package]] name = "miniz_oxide" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae" dependencies = [ "adler", ] @@ -4001,11 +4005,10 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" dependencies = [ - "lazy_static", "libc", "log", "openssl", @@ -4180,9 +4183,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "300.2.3+3.2.1" +version = "300.3.1+3.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cff92b6f71555b61bb9315f7c64da3ca43d87531622120fea0195fc761b4843" +checksum = "7259953d42a81bf137fbbd73bd30a8e1914d6dce43c2b90ed575783a22608b91" dependencies = [ "cc", ] @@ -4435,9 +4438,9 @@ checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" [[package]] name = "plotters" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c224ba00d7cadd4d5c660deaf2098e5e80e07846537c51f9cfa4be50c1fd45" +checksum = "a15b6eccb8484002195a3e44fe65a4ce8e93a625797a063735536fd59cb01cf3" dependencies = [ "num-traits", "plotters-backend", @@ -4448,15 +4451,15 @@ dependencies = [ [[package]] name = "plotters-backend" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e76628b4d3a7581389a35d5b6e2139607ad7c75b17aed325f210aa91f4a9609" +checksum = "414cec62c6634ae900ea1c56128dfe87cf63e7caece0852ec76aba307cebadb7" [[package]] name = "plotters-svg" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f6d39893cca0701371e3c27294f09797214b86f1fb951b89ade8ec04e2abab" +checksum = "81b30686a7d9c3e010b84284bdd26a29f2138574f52f5eb6f794fc0ad924e705" dependencies = [ "plotters-backend", ] @@ -4580,9 +4583,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.12.4" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" dependencies = [ "bytes", "prost-derive", @@ -4590,9 +4593,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.5" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", "itertools 0.12.1", @@ -4603,9 +4606,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.12.4" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" dependencies = [ "prost", ] @@ -4979,9 +4982,9 @@ checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" -version = "0.102.3" +version = "0.102.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" dependencies = [ "ring", "rustls-pki-types", @@ -5544,9 +5547,9 @@ dependencies = [ [[package]] name = "symbolic-common" -version = "12.8.0" +version = "12.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cccfffbc6bb3bb2d3a26cd2077f4d055f6808d266f9d4d158797a4c60510dfe" +checksum = "89d2aef0f60f62e38c472334148758afbd570ed78d20be622692e5ebfec3734f" dependencies = [ "debugid", "memmap2", @@ -5556,9 +5559,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "12.8.0" +version = "12.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76a99812da4020a67e76c4eb41f08c87364c14170495ff780f30dd519c221a68" +checksum = "1719d1292eac816cdd3fdad12b22315624b7ce6a7bacb267a3a27fccfd286b48" dependencies = [ "cpp_demangle 0.4.3", "rustc-demangle", @@ -5613,9 +5616,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tar" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" +checksum = "cb797dad5fb5b76fcf519e702f4a589483b5ef06567f160c392832c1f5e44909" dependencies = [ "filetime", "libc", @@ -5660,6 +5663,7 @@ dependencies = [ "iroha_core", "iroha_crypto", "iroha_data_model", + "iroha_futures", "iroha_genesis", "iroha_logger", "iroha_primitives", @@ -5889,6 +5893,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] @@ -5944,7 +5950,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.8", + "winnow 0.6.13", ] [[package]] @@ -6206,9 +6212,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "unicode-xid" @@ -6474,9 +6480,9 @@ dependencies = [ [[package]] name = "wasm-encoder" -version = "0.207.0" +version = "0.209.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d996306fb3aeaee0d9157adbe2f670df0236caf19f6728b221e92d0f27b3fe17" +checksum = "7b4a05336882dae732ce6bd48b7e11fe597293cb72c13da4f35d7d5f8d53b2a7" dependencies = [ "leb128", ] @@ -6817,22 +6823,22 @@ checksum = "9b6060bc082cc32d9a45587c7640e29e3c7b89ada82677ac25d87850aaccb368" [[package]] name = "wast" -version = "207.0.0" +version = "209.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e40be9fd494bfa501309487d2dc0b3f229be6842464ecbdc54eac2679c84c93" +checksum = "8fffef2ff6147e4d12e972765fd75332c6a11c722571d4ab7a780d81ffc8f0a4" dependencies = [ "bumpalo", "leb128", "memchr", "unicode-width", - "wasm-encoder 0.207.0", + "wasm-encoder 0.209.1", ] [[package]] name = "wat" -version = "1.207.0" +version = "1.209.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eb2b15e2d5f300f5e1209e7dc237f2549edbd4203655b6c6cab5cf180561ee7" +checksum = "42203ec0271d113f8eb1f77ebc624886530cecb35915a7f63a497131f16e4d24" dependencies = [ "wast", ] @@ -6859,9 +6865,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +checksum = "3c452ad30530b54a4d8e71952716a212b08efd0f3562baa66c29a618b07da7c3" dependencies = [ "rustls-pki-types", ] @@ -7056,9 +7062,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.8" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c52e9c97a68071b23e836c9380edae937f17b9c4667bd021973efc689f618d" +checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" dependencies = [ "memchr", ] @@ -7152,9 +7158,9 @@ dependencies = [ [[package]] name = "zip" -version = "2.1.0" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2568cd0f20e86cd9a7349fe05178f7bd22f22724678448ae5a9bac266df2689" +checksum = "775a2b471036342aa69bc5a602bc889cb0a06cda00477d0c69566757d5553d39" dependencies = [ "arbitrary", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index ae5831c18a0..a69f9220ace 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ futures = { version = "0.3.30", default-features = false } tokio = "1.38.0" tokio-stream = "0.1.15" tokio-tungstenite = "0.21.0" +tokio-util = "0.7.11" tungstenite = "0.21.0" crossbeam-queue = "0.3.11" parking_lot = { version = "0.12.3" } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 6889d9baa09..7ee27380498 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -30,9 +30,6 @@ dev-telemetry = ["telemetry", "iroha_telemetry/dev-telemetry", "iroha_logger/tok # Support schema generation from the `schema` endpoint in the local binary. # Useful for debugging issues with decoding in SDKs. schema-endpoint = ["iroha_torii/schema"] -# Support internal testing infrastructure for integration tests. -# Disable in production. -test-network = ["thread-local-panic-hook"] [badges] is-it-maintained-issue-resolution = { repository = "https://github.com/hyperledger/iroha" } @@ -59,13 +56,12 @@ error-stack = { workspace = true, features = ["eyre"] } thiserror = { workspace = true } tracing = { workspace = true } tokio = { workspace = true, features = ["macros", "signal"] } +tokio-util = { workspace = true, features = ["rt"] } once_cell = { workspace = true } owo-colors = { workspace = true, features = ["supports-colors"] } supports-color = { workspace = true } toml = { workspace = true } -thread-local-panic-hook = { version = "0.1.0", optional = true } - [dev-dependencies] serial_test = "3.1.1" tempfile = { workspace = true } @@ -84,6 +80,5 @@ vergen = { workspace = true, features = ["cargo"] } denylist = [ "schema-endpoint", "telemetry", - "test-network", ] skip_optional_dependencies = true diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 027a9189aef..8ed6479180b 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -7,6 +7,7 @@ #[cfg(debug_assertions)] use core::sync::atomic::{AtomicBool, Ordering}; use std::{ + future::Future, path::{Path, PathBuf}, sync::Arc, }; @@ -22,20 +23,18 @@ use iroha_core::metrics::MetricsReporter; use iroha_core::{ block_sync::{BlockSynchronizer, BlockSynchronizerHandle}, gossiper::{TransactionGossiper, TransactionGossiperHandle}, - handler::ThreadHandler, kiso::KisoHandle, kura::Kura, query::store::LiveQueryStore, queue::Queue, smartcontracts::isi::Registrable as _, - snapshot::{ - try_read_snapshot, SnapshotMaker, SnapshotMakerHandle, TryReadError as TryReadSnapshotError, - }, + snapshot::{try_read_snapshot, SnapshotMaker, TryReadError as TryReadSnapshotError}, state::{State, StateReadOnly, World}, sumeragi::{GenesisWithPubKey, SumeragiHandle, SumeragiMetrics, SumeragiStartArgs}, IrohaNetwork, }; use iroha_data_model::prelude::*; +use iroha_futures::supervisor::{ShutdownSignal, Supervisor}; use iroha_genesis::GenesisTransaction; use iroha_logger::{actor::LoggerHandle, InitConfig as LoggerInitConfig}; use iroha_primitives::addr::SocketAddr; @@ -43,52 +42,30 @@ use iroha_torii::Torii; use iroha_version::scale::DecodeVersioned; use thiserror::Error; use tokio::{ - signal, - sync::{broadcast, mpsc, Notify}, + sync::{broadcast, mpsc}, task, }; // FIXME: move from CLI pub mod samples; +const EVENTS_BUFFER_CAPACITY: usize = 10_000; + /// Iroha is an /// [Orchestrator](https://en.wikipedia.org/wiki/Orchestration_%28computing%29) /// of the system. It configures, coordinates and manages transactions /// and queries processing, work of consensus and storage. pub struct Iroha { - /// Actor responsible for the configuration - _kiso: KisoHandle, - /// Queue of transactions - _queue: Arc, - /// Sumeragi consensus - _sumeragi: SumeragiHandle, /// Kura — block storage kura: Arc, - /// Snapshot service. Might be not started depending on the config. - _snapshot_maker: Option, /// State of blockchain state: Arc, - /// Shutdown signal - notify_shutdown: Arc, - /// Thread handlers - thread_handlers: Vec, /// A boolean value indicating whether or not the peers will receive data from the network. /// Used in sumeragi testing. #[cfg(debug_assertions)] pub freeze_status: Arc, } -impl Drop for Iroha { - fn drop(&mut self) { - iroha_logger::trace!("Iroha instance dropped"); - self.notify_shutdown.notify_waiters(); - let _thread_handles = core::mem::take(&mut self.thread_handlers); - iroha_logger::debug!( - "Thread handles dropped. Dependent processes going for a graceful shutdown" - ) - } -} - /// Error(s) that might occur while starting [`Iroha`] #[derive(thiserror::Error, Debug, Copy, Clone)] #[allow(missing_docs)] @@ -110,18 +87,13 @@ pub enum StartError { struct NetworkRelay { sumeragi: SumeragiHandle, block_sync: BlockSynchronizerHandle, - gossiper: TransactionGossiperHandle, + tx_gossiper: TransactionGossiperHandle, network: IrohaNetwork, - shutdown_notify: Arc, #[cfg(debug_assertions)] freeze_status: Arc, } impl NetworkRelay { - fn start(self) { - tokio::task::spawn(self.run()); - } - async fn run(mut self) { let (sender, mut receiver) = mpsc::channel(1); self.network.subscribe_to_peers_messages(sender); @@ -129,13 +101,12 @@ impl NetworkRelay { #[allow(clippy::redundant_pub_crate)] loop { tokio::select! { - // Receive message from network + // Receive a message from the network Some(msg) = receiver.recv() => self.handle_message(msg).await, - () = self.shutdown_notify.notified() => { - iroha_logger::info!("NetworkRelay is being shut down."); + else => { + iroha_logger::debug!("Exiting the network relay"); break; - } - else => break, + }, } tokio::task::yield_now().await; } @@ -157,83 +128,41 @@ impl NetworkRelay { self.sumeragi.incoming_control_flow_message(*data); } BlockSync(data) => self.block_sync.message(*data).await, - TransactionGossiper(data) => self.gossiper.gossip(*data).await, + TransactionGossiper(data) => self.tx_gossiper.gossip(*data).await, Health => {} } } } impl Iroha { - fn prepare_panic_hook(notify_shutdown: Arc) { - #[cfg(not(feature = "test-network"))] - use std::panic::set_hook; - - // This is a hot-fix for tests - // - // # Problem - // - // When running tests in parallel `std::panic::set_hook()` will be set - // the same for all threads. That means, that panic in one test can - // cause another test shutdown, which we don't want. - // - // # Downside - // - // A downside of this approach is that this panic hook will not work for - // threads created by Iroha itself (e.g. Sumeragi thread). - // - // # TODO - // - // Remove this when all Rust integrations tests will be converted to a - // separate Python tests. - #[cfg(feature = "test-network")] - use thread_local_panic_hook::set_hook; - - set_hook(Box::new(move |info| { - // What clippy suggests is much less readable in this case - #[allow(clippy::option_if_let_else)] - let panic_message = if let Some(message) = info.payload().downcast_ref::<&str>() { - message - } else if let Some(message) = info.payload().downcast_ref::() { - message - } else { - "unspecified" - }; - - let location = info.location().map_or_else( - || "unspecified".to_owned(), - |location| format!("{}:{}", location.file(), location.line()), - ); - - iroha_logger::error!(panic_message, location, "A panic occurred, shutting down"); - - // NOTE: shutdown all currently listening waiters - notify_shutdown.notify_waiters(); - })); - } - - /// Creates new Iroha instance and starts all internal services. + /// Starts Iroha with all its subsystems. /// - /// Returns iroha itself and future to await for iroha completion. + /// Returns iroha itself and a future of system shutdown. /// /// # Errors /// - Reading telemetry configs /// - Telemetry setup /// - Initialization of [`Sumeragi`] and [`Kura`] - /// - /// # Side Effects - /// - Sets global panic hook #[allow(clippy::too_many_lines)] - #[iroha_logger::log(name = "init", skip_all)] // This is actually easier to understand as a linear sequence of init statements. - pub async fn start_network( + #[iroha_logger::log(name = "start", skip_all)] // This is actually easier to understand as a linear sequence of init statements. + pub async fn start( config: Config, genesis: Option, logger: LoggerHandle, - ) -> Result<(impl core::future::Future, Self), StartError> { - let network = IrohaNetwork::start(config.common.key_pair.clone(), config.network.clone()) - .await - .change_context(StartError::StartP2p)?; + shutdown_signal: ShutdownSignal, + ) -> Result< + ( + Self, + impl Future>, + ), + StartError, + > { + let supervisor = Supervisor::new(); + + let (kura, block_count) = Kura::new(&config.kura).change_context(StartError::InitKura)?; + let child = Kura::start(kura.clone(), supervisor.shutdown_signal()); + supervisor.monitor(child); - let (events_sender, _) = broadcast::channel(10000); let world = World::with( [genesis_domain(config.genesis.public_key.clone())], config @@ -244,14 +173,14 @@ impl Iroha { .into_non_empty_vec(), ); - let (kura, block_count) = Kura::new(&config.kura).change_context(StartError::InitKura)?; - let kura_thread_handler = Kura::start(Arc::clone(&kura)); - let live_query_store_handle = LiveQueryStore::from_config(config.live_query_store).start(); + let (live_query_store, child) = + LiveQueryStore::from_config(config.live_query_store).start(); + supervisor.monitor(child); let state = match try_read_snapshot( config.snapshot.store_dir.resolve_relative_path(), &kura, - live_query_store_handle.clone(), + || live_query_store.clone(), block_count, ) { Ok(state) => { @@ -274,15 +203,22 @@ impl Iroha { config.chain_wide, world, Arc::clone(&kura), - live_query_store_handle.clone(), + live_query_store.clone(), ) }); let state = Arc::new(state); + let (events_sender, _) = broadcast::channel(EVENTS_BUFFER_CAPACITY); let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone())); + let (network, child) = + IrohaNetwork::start(config.common.key_pair.clone(), config.network.clone()) + .await + .change_context(StartError::StartP2p)?; + supervisor.monitor(child); + #[cfg(feature = "telemetry")] - Self::start_telemetry(&logger, &config).await?; + start_telemetry(&logger, &config, &supervisor).await?; #[cfg(feature = "telemetry")] let metrics_reporter = MetricsReporter::new( @@ -292,13 +228,13 @@ impl Iroha { queue.clone(), ); - let start_args = SumeragiStartArgs { + let (sumeragi, child) = SumeragiStartArgs { sumeragi_config: config.sumeragi.clone(), common_config: config.common.clone(), events_sender: events_sender.clone(), - state: Arc::clone(&state), - queue: Arc::clone(&queue), - kura: Arc::clone(&kura), + state: state.clone(), + queue: queue.clone(), + kura: kura.clone(), network: network.clone(), genesis_network: GenesisWithPubKey { genesis, @@ -309,23 +245,22 @@ impl Iroha { dropped_messages: metrics_reporter.metrics().dropped_messages.clone(), view_changes: metrics_reporter.metrics().view_changes.clone(), }, - }; - // Starting Sumeragi requires no async context enabled - let sumeragi = task::spawn_blocking(move || SumeragiHandle::start(start_args)) - .await - .expect("Failed to join task with Sumeragi start"); + } + .start(supervisor.shutdown_signal()); + supervisor.monitor(child); - let block_sync = BlockSynchronizer::from_config( + let (block_sync, child) = BlockSynchronizer::from_config( &config.block_sync, sumeragi.clone(), - Arc::clone(&kura), + kura.clone(), config.common.peer_id(), network.clone(), Arc::clone(&state), ) .start(); + supervisor.monitor(child); - let gossiper = TransactionGossiper::from_config( + let (tx_gossiper, child) = TransactionGossiper::from_config( config.common.chain_id.clone(), config.transaction_gossiper, network.clone(), @@ -333,179 +268,68 @@ impl Iroha { Arc::clone(&state), ) .start(); + supervisor.monitor(child); #[cfg(debug_assertions)] let freeze_status = Arc::new(AtomicBool::new(false)); - let notify_shutdown = Arc::new(Notify::new()); + supervisor.monitor(task::spawn( + NetworkRelay { + sumeragi, + block_sync, + tx_gossiper, + network, + #[cfg(debug_assertions)] + freeze_status: freeze_status.clone(), + } + .run(), + )); - NetworkRelay { - sumeragi: sumeragi.clone(), - block_sync, - gossiper, - network: network.clone(), - shutdown_notify: Arc::clone(¬ify_shutdown), - #[cfg(debug_assertions)] - freeze_status: freeze_status.clone(), + if let Some(snapshot_maker) = + SnapshotMaker::from_config(&config.snapshot, Arc::clone(&state)) + { + supervisor.monitor(snapshot_maker.start(supervisor.shutdown_signal())); } - .start(); - let snapshot_maker = SnapshotMaker::from_config(&config.snapshot, Arc::clone(&state)) - .map(SnapshotMaker::start); + let (kiso, child) = KisoHandle::start(config.clone()); + supervisor.monitor(child); - let kiso = KisoHandle::new(config.clone()); - - let torii = Torii::new( + let children = Torii::new( config.common.chain_id.clone(), kiso.clone(), config.torii, - Arc::clone(&queue), + queue, events_sender, - Arc::clone(¬ify_shutdown), - live_query_store_handle, - Arc::clone(&kura), - Arc::clone(&state), + live_query_store, + kura.clone(), + state.clone(), #[cfg(feature = "telemetry")] metrics_reporter, - ); - - tokio::spawn(async move { - torii - .start() - .await - .into_report() - .map_err(|report| report.change_context(StartError::StartTorii)) - }); - - Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone()); - - Self::start_listening_signal(Arc::clone(¬ify_shutdown))?; - - Self::prepare_panic_hook(Arc::clone(¬ify_shutdown)); - - // Future to wait for iroha completion - let wait = { - let notify_shutdown = Arc::clone(¬ify_shutdown); - async move { notify_shutdown.notified().await } - }; - - let irohad = Self { - _kiso: kiso, - _queue: queue, - _sumeragi: sumeragi, - kura, - _snapshot_maker: snapshot_maker, - state, - notify_shutdown, - thread_handlers: vec![kura_thread_handler], - #[cfg(debug_assertions)] - freeze_status, - }; - - Ok((wait, irohad)) - } - - #[cfg(feature = "telemetry")] - async fn start_telemetry(logger: &LoggerHandle, config: &Config) -> Result<(), StartError> { - const MSG_SUBSCRIBE: &str = "unable to subscribe to the channel"; - const MSG_START_TASK: &str = "unable to start the task"; - - #[cfg(feature = "dev-telemetry")] - { - if let Some(out_file) = &config.dev_telemetry.out_file { - let receiver = logger - .subscribe_on_telemetry(iroha_logger::telemetry::Channel::Future) - .await - .change_context(StartError::StartDevTelemetry) - .attach_printable(MSG_SUBSCRIBE)?; - let _handle = iroha_telemetry::dev::start_file_output( - out_file.resolve_relative_path(), - receiver, - ) - .await - .into_report() - .map_err(|report| report.change_context(StartError::StartDevTelemetry)) - .attach_printable(MSG_START_TASK)?; - } - } - - if let Some(config) = &config.telemetry { - let receiver = logger - .subscribe_on_telemetry(iroha_logger::telemetry::Channel::Regular) - .await - .change_context(StartError::StartTelemetry) - .attach_printable(MSG_SUBSCRIBE)?; - let _handle = iroha_telemetry::ws::start(config.clone(), receiver) - .await - .into_report() - .map_err(|report| report.change_context(StartError::StartTelemetry)) - .attach_printable(MSG_START_TASK)?; - iroha_logger::info!("Telemetry started"); - Ok(()) - } else { - iroha_logger::info!("Telemetry not started due to absent configuration"); - Ok(()) + ) + .start(&supervisor.shutdown_signal()) + .into_report() + .map_err(|report| report.change_context(StartError::StartTorii))?; + for i in children { + supervisor.monitor(i); } - } - fn start_listening_signal( - notify_shutdown: Arc, - ) -> Result, StartError> { - let (mut sigint, mut sigterm) = signal::unix::signal(signal::unix::SignalKind::interrupt()) - .and_then(|sigint| { - let sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?; + supervisor.monitor(tokio::task::spawn(config_updates_relay(kiso, logger))); - Ok((sigint, sigterm)) - }) + supervisor + .setup_shutdown_on_os_signals() .change_context(StartError::ListenOsSignal)?; - // NOTE: Triggered by tokio::select - #[allow(clippy::redundant_pub_crate)] - let handle = task::spawn(async move { - tokio::select! { - _ = sigint.recv() => { - iroha_logger::info!("SIGINT received, shutting down..."); - }, - _ = sigterm.recv() => { - iroha_logger::info!("SIGTERM received, shutting down..."); - }, - } - - // NOTE: shutdown all currently listening waiters - notify_shutdown.notify_waiters(); - }); - - Ok(handle) - } + supervisor.shutdown_on_external_signal(shutdown_signal); - /// Spawns a task which subscribes on updates from configuration actor - /// and broadcasts them further to interested actors. This way, neither config actor nor other ones know - /// about each other, achieving loose coupling of code and system. - fn spawn_config_updates_broadcasting( - kiso: KisoHandle, - logger: LoggerHandle, - ) -> task::JoinHandle<()> { - tokio::spawn(async move { - let mut log_level_update = kiso - .subscribe_on_log_level() - .await - // FIXME: don't like neither the message nor inability to throw Result to the outside - .expect("Cannot proceed without working subscriptions"); - - // See https://github.com/tokio-rs/tokio/issues/5616 and - // https://github.com/rust-lang/rust-clippy/issues/10636 - #[allow(clippy::redundant_pub_crate)] - loop { - tokio::select! { - Ok(()) = log_level_update.changed() => { - let value = *log_level_update.borrow_and_update(); - if let Err(error) = logger.reload_level(value).await { - iroha_logger::error!("Failed to reload log level: {error}"); - }; - } - }; - } - }) + Ok(( + Self { + kura, + state, + #[cfg(debug_assertions)] + freeze_status, + }, + supervisor.wait_all(), + )) } #[allow(missing_docs)] @@ -525,6 +349,82 @@ impl Iroha { } } +#[cfg(feature = "telemetry")] +async fn start_telemetry( + logger: &LoggerHandle, + config: &Config, + supervisor: &Supervisor, +) -> Result<(), StartError> { + const MSG_SUBSCRIBE: &str = "unable to subscribe to the channel"; + const MSG_START_TASK: &str = "unable to start the task"; + + #[cfg(feature = "dev-telemetry")] + { + if let Some(out_file) = &config.dev_telemetry.out_file { + let receiver = logger + .subscribe_on_telemetry(iroha_logger::telemetry::Channel::Future) + .await + .change_context(StartError::StartDevTelemetry) + .attach_printable(MSG_SUBSCRIBE)?; + let handle = + iroha_telemetry::dev::start_file_output(out_file.resolve_relative_path(), receiver) + .await + .into_report() + .map_err(|report| report.change_context(StartError::StartDevTelemetry)) + .attach_printable(MSG_START_TASK)?; + supervisor.monitor(handle); + } + } + + if let Some(config) = &config.telemetry { + let receiver = logger + .subscribe_on_telemetry(iroha_logger::telemetry::Channel::Regular) + .await + .change_context(StartError::StartTelemetry) + .attach_printable(MSG_SUBSCRIBE)?; + let handle = iroha_telemetry::ws::start(config.clone(), receiver) + .await + .into_report() + .map_err(|report| report.change_context(StartError::StartTelemetry)) + .attach_printable(MSG_START_TASK)?; + supervisor.monitor(handle); + iroha_logger::info!("Telemetry started"); + Ok(()) + } else { + iroha_logger::info!("Telemetry not started due to absent configuration"); + Ok(()) + } +} + +/// Spawns a task which subscribes on updates from the configuration actor +/// and broadcasts them further to interested actors. This way, neither the config actor nor other ones know +/// about each other, achieving loose coupling of code and system. +async fn config_updates_relay(kiso: KisoHandle, logger: LoggerHandle) { + let mut log_level_update = kiso + .subscribe_on_log_level() + .await + // FIXME: don't like neither the message nor inability to throw Result to the outside + .expect("Cannot proceed without working subscriptions"); + + // See https://github.com/tokio-rs/tokio/issues/5616 and + // https://github.com/rust-lang/rust-clippy/issues/10636 + #[allow(clippy::redundant_pub_crate)] + loop { + tokio::select! { + Ok(()) = log_level_update.changed() => { + let value = *log_level_update.borrow_and_update(); + if let Err(error) = logger.reload_level(value).await { + iroha_logger::error!("Failed to reload log level: {error}"); + }; + } + else => { + iroha_logger::debug!("Exiting config updates relay"); + break; + } + }; + } +} + fn genesis_account(public_key: PublicKey) -> Account { let genesis_account_id = AccountId::new(iroha_genesis::GENESIS_DOMAIN_ID.clone(), public_key); Account::new(genesis_account_id.clone()).build(&genesis_account_id) @@ -776,31 +676,6 @@ mod tests { use super::*; - #[cfg(not(feature = "test-network"))] - mod no_test_network { - use std::{iter::repeat, panic, thread}; - - use futures::future::join_all; - use serial_test::serial; - - use super::*; - - #[tokio::test] - #[serial] - async fn iroha_should_notify_on_panic() { - let notify = Arc::new(Notify::new()); - let hook = panic::take_hook(); - >::prepare_panic_hook(Arc::clone(¬ify)); - let waiters: Vec<_> = repeat(()).take(10).map(|_| Arc::clone(¬ify)).collect(); - let handles: Vec<_> = waiters.iter().map(|waiter| waiter.notified()).collect(); - thread::spawn(move || { - panic!("Test panic"); - }); - join_all(handles).await; - panic::set_hook(hook); - } - } - mod config_integration { use assertables::{assert_contains, assert_contains_as_result}; use iroha_crypto::{ExposedPrivateKey, KeyPair}; diff --git a/cli/src/main.rs b/cli/src/main.rs index f8033d61680..ff435184217 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -3,6 +3,7 @@ use std::env; use clap::Parser; use error_stack::{IntoReportCompat, ResultExt}; +use iroha_futures::supervisor::ShutdownSignal; use irohad::{Args, Iroha}; #[derive(thiserror::Error, Debug)] @@ -13,8 +14,10 @@ enum MainError { Config, #[error("Could not initialize logger")] Logger, - #[error("Could not start Iroha")] + #[error("Failed to start Iroha")] IrohaStart, + #[error("Error occured while running Iroha")] + IrohaRun, } #[tokio::main] @@ -51,13 +54,19 @@ async fn main() -> error_stack::Result<(), MainError> { iroha_logger::debug!("Submitting genesis."); } - Iroha::start_network(config, genesis, logger) - .await - .change_context(MainError::IrohaStart)? - .0 - .await; + let shutdown_on_panic = ShutdownSignal::new(); + let default_hook = std::panic::take_hook(); + let signal_clone = shutdown_on_panic.clone(); + std::panic::set_hook(Box::new(move |info| { + iroha_logger::error!("Panic occurred, shutting down Iroha gracefully..."); + signal_clone.send(); + default_hook(info); + })); - Ok(()) + let (_iroha, supervisor_fut) = Iroha::start(config, genesis, logger, shutdown_on_panic) + .await + .change_context(MainError::IrohaStart)?; + supervisor_fut.await.change_context(MainError::IrohaRun) } /// Configures globals of [`error_stack::Report`] diff --git a/client/tests/integration/extra_functional/restart_peer.rs b/client/tests/integration/extra_functional/restart_peer.rs index 2b919073c68..d86abf958bb 100644 --- a/client/tests/integration/extra_functional/restart_peer.rs +++ b/client/tests/integration/extra_functional/restart_peer.rs @@ -60,7 +60,7 @@ fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> { .collect(); let removed_peer_idx = rand::thread_rng().gen_range(0..all_peers.len()); let mut removed_peer = all_peers.swap_remove(removed_peer_idx); - removed_peer.stop(); + removed_peer.terminate(); removed_peer }; // All peers have been stopped here diff --git a/core/Cargo.toml b/core/Cargo.toml index 46b8ff1cc1d..6c2c336404c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -109,6 +109,5 @@ path = "benches/blocks/validate_blocks_oneshot.rs" denylist = [ "schema-endpoint", "telemetry", - "test-network" ] skip_optional_dependencies = true diff --git a/core/benches/blocks/common.rs b/core/benches/blocks/common.rs index 851dd6592c2..f119dc16442 100644 --- a/core/benches/blocks/common.rs +++ b/core/benches/blocks/common.rs @@ -168,7 +168,7 @@ pub fn build_state(rt: &tokio::runtime::Handle, account_id: &AccountId) -> State let kura = iroha_core::kura::Kura::blank_kura_for_testing(); let query_handle = { let _guard = rt.enter(); - LiveQueryStore::test().start() + LiveQueryStore::start_test() }; let mut domain = Domain::new(account_id.domain_id.clone()).build(account_id); domain.accounts.insert( diff --git a/core/benches/kura.rs b/core/benches/kura.rs index 89d404be119..c0c18f15384 100644 --- a/core/benches/kura.rs +++ b/core/benches/kura.rs @@ -15,6 +15,7 @@ use iroha_core::{ }; use iroha_crypto::KeyPair; use iroha_data_model::{prelude::*, transaction::TransactionLimits}; +use iroha_futures::supervisor::ShutdownSignal; use iroha_primitives::unique_vec::UniqueVec; use test_samples::gen_account_in; use tokio::{fs, runtime::Runtime}; @@ -43,9 +44,9 @@ async fn measure_block_size_for_n_executors(n_executors: u32) { store_dir: WithOrigin::inline(dir.path().to_path_buf()), }; let (kura, _) = iroha_core::kura::Kura::new(&cfg).unwrap(); - let _thread_handle = iroha_core::kura::Kura::start(kura.clone()); + let _thread_handle = iroha_core::kura::Kura::start(kura.clone(), ShutdownSignal::new()); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(World::new(), kura, query_handle); let topology = Topology::new(UniqueVec::new()); let mut block = { diff --git a/core/benches/validation.rs b/core/benches/validation.rs index 7d0fe5d973c..ddce748209e 100644 --- a/core/benches/validation.rs +++ b/core/benches/validation.rs @@ -43,7 +43,7 @@ fn build_test_transaction(chain_id: ChainId) -> SignedTransaction { fn build_test_and_transient_state() -> State { let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new( { @@ -148,7 +148,7 @@ fn sign_blocks(criterion: &mut Criterion) { .expect("Failed to accept transaction."); let key_pair = KeyPair::random(); let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(World::new(), kura, query_handle); let topology = Topology::new(UniqueVec::new()); diff --git a/core/src/block.rs b/core/src/block.rs index 711ef6431d8..afee72272f7 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -827,7 +827,7 @@ mod tests { assert!(domain.add_account(account).is_none()); let world = World::with([domain], UniqueVec::new()); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world, kura, query_handle); let mut state_block = state.block(); @@ -882,7 +882,7 @@ mod tests { assert!(domain.add_account(account).is_none()); let world = World::with([domain], UniqueVec::new()); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world, kura, query_handle); let mut state_block = state.block(); @@ -958,7 +958,7 @@ mod tests { ); let world = World::with([domain], UniqueVec::new()); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world, kura, query_handle); let mut state_block = state.block(); let transaction_limits = &state_block.transaction_executor().transaction_limits; @@ -1039,7 +1039,7 @@ mod tests { assert!(genesis_domain.add_account(genesis_wrong_account).is_none(),); let world = World::with([genesis_domain], UniqueVec::new()); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world, kura, query_handle); let mut state_block = state.block(); diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs index ef7f5b8c10a..7a69227f237 100644 --- a/core/src/block_sync.rs +++ b/core/src/block_sync.rs @@ -4,6 +4,7 @@ use std::{fmt::Debug, num::NonZeroU32, sync::Arc, time::Duration}; use iroha_config::parameters::actual::BlockSync as Config; use iroha_crypto::HashOf; use iroha_data_model::{block::SignedBlock, prelude::*}; +use iroha_futures::supervisor::{Child, OnShutdown}; use iroha_logger::prelude::*; use iroha_macro::*; use iroha_p2p::Post; @@ -48,10 +49,12 @@ pub struct BlockSynchronizer { impl BlockSynchronizer { /// Start [`Self`] actor. - pub fn start(self) -> BlockSynchronizerHandle { + pub fn start(self) -> (BlockSynchronizerHandle, Child) { let (message_sender, message_receiver) = mpsc::channel(1); - tokio::task::spawn(self.run(message_receiver)); - BlockSynchronizerHandle { message_sender } + ( + BlockSynchronizerHandle { message_sender }, + Child::new(tokio::spawn(self.run(message_receiver)), OnShutdown::Abort), + ) } /// [`Self`] task. @@ -60,13 +63,13 @@ impl BlockSynchronizer { loop { tokio::select! { _ = gossip_period.tick() => self.request_block().await, - msg = message_receiver.recv() => { - let Some(msg) = msg else { - info!("All handler to BlockSynchronizer are dropped. Shutting down..."); - break; - }; + Some(msg) = message_receiver.recv() => { msg.handle_message(&mut self).await; } + else => { + debug!("Shutting down block sync"); + break; + }, } tokio::task::yield_now().await; } diff --git a/core/src/gossiper.rs b/core/src/gossiper.rs index 22b67135aee..79750d2a9ef 100644 --- a/core/src/gossiper.rs +++ b/core/src/gossiper.rs @@ -4,6 +4,7 @@ use std::{num::NonZeroU32, sync::Arc, time::Duration}; use iroha_config::parameters::actual::TransactionGossiper as Config; use iroha_data_model::{transaction::SignedTransaction, ChainId}; +use iroha_futures::supervisor::{Child, OnShutdown}; use iroha_p2p::Broadcast; use parity_scale_codec::{Decode, Encode}; use tokio::sync::mpsc; @@ -46,10 +47,15 @@ pub struct TransactionGossiper { impl TransactionGossiper { /// Start [`Self`] actor. - pub fn start(self) -> TransactionGossiperHandle { + pub fn start(self) -> (TransactionGossiperHandle, Child) { let (message_sender, message_receiver) = mpsc::channel(1); - tokio::task::spawn(self.run(message_receiver)); - TransactionGossiperHandle { message_sender } + ( + TransactionGossiperHandle { message_sender }, + Child::new( + tokio::task::spawn(self.run(message_receiver)), + OnShutdown::Abort, + ), + ) } /// Construct [`Self`] from configuration @@ -78,13 +84,13 @@ impl TransactionGossiper { loop { tokio::select! { _ = gossip_period.tick() => self.gossip_transactions(), - transaction_gossip = message_receiver.recv() => { - let Some(transaction_gossip) = transaction_gossip else { - iroha_logger::info!("All handler to Gossiper are dropped. Shutting down..."); - break; - }; + Some(transaction_gossip) = message_receiver.recv() => { self.handle_transaction_gossip(transaction_gossip); } + else => { + iroha_logger::info!("Shutting down transactions gossiper"); + break; + }, } tokio::task::yield_now().await; } diff --git a/core/src/kiso.rs b/core/src/kiso.rs index 67837c12816..7a4051e1c52 100644 --- a/core/src/kiso.rs +++ b/core/src/kiso.rs @@ -12,6 +12,7 @@ use iroha_config::{ client_api::{ConfigDTO, Logger as LoggerDTO}, parameters::actual::Root as Config, }; +use iroha_futures::supervisor::{Child, OnShutdown}; use iroha_logger::Level; use tokio::sync::{mpsc, oneshot, watch}; @@ -27,7 +28,7 @@ pub struct KisoHandle { impl KisoHandle { /// Spawn a new actor - pub fn new(state: Config) -> Self { + pub fn start(state: Config) -> (Self, Child) { let (actor_sender, actor_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let (log_level_update, _) = watch::channel(state.logger.level); let mut actor = Actor { @@ -35,11 +36,15 @@ impl KisoHandle { state, log_level_update, }; - tokio::spawn(async move { actor.run().await }); - - Self { - actor: actor_sender, - } + ( + Self { + actor: actor_sender, + }, + Child::new( + tokio::spawn(async move { actor.run().await }), + OnShutdown::Abort, + ), + ) } /// Fetch the [`ConfigDTO`] from the actor's state. @@ -176,7 +181,7 @@ mod tests { let mut config = test_config(); config.logger.level = INIT_LOG_LEVEL; - let kiso = KisoHandle::new(config); + let (kiso, _) = KisoHandle::start(config); let mut recv = kiso .subscribe_on_log_level() diff --git a/core/src/kura.rs b/core/src/kura.rs index b9d37c8d104..9e7c2c784a7 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -8,17 +8,19 @@ use std::{ io::{BufWriter, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use iroha_config::{kura::InitMode, parameters::actual::Kura as Config}; use iroha_crypto::{Hash, HashOf}; use iroha_data_model::block::SignedBlock; +use iroha_futures::supervisor::{spawn_os_thread_as_future, Child, OnShutdown, ShutdownSignal}; use iroha_logger::prelude::*; use iroha_version::scale::{DecodeVersioned, EncodeVersioned}; use parity_scale_codec::DecodeAll; use parking_lot::Mutex; -use crate::{block::CommittedBlock, handler::ThreadHandler}; +use crate::block::CommittedBlock; const INDEX_FILE_NAME: &str = "blocks.index"; const DATA_FILE_NAME: &str = "blocks.data"; @@ -82,21 +84,16 @@ impl Kura { } /// Start the Kura thread - pub fn start(kura: Arc) -> ThreadHandler { - // Oneshot channel to allow forcefully stopping the thread. - let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel(); - - let thread_handle = std::thread::spawn(move || { - Self::kura_receive_blocks_loop(&kura, shutdown_receiver); - }); - - let shutdown = move || { - if let Err(error) = shutdown_sender.send(()) { - iroha_logger::error!(?error); - } - }; - - ThreadHandler::new(Box::new(shutdown), thread_handle) + pub fn start(kura: Arc, shutdown_signal: ShutdownSignal) -> Child { + Child::new( + tokio::task::spawn(spawn_os_thread_as_future( + std::thread::Builder::new().name("kura".to_owned()), + move || { + Self::kura_receive_blocks_loop(&kura, &shutdown_signal); + }, + )), + OnShutdown::Wait(Duration::from_secs(5)), + ) } /// Initialize [`Kura`] after its construction to be able to work with it. @@ -191,10 +188,7 @@ impl Kura { } #[iroha_logger::log(skip_all)] - fn kura_receive_blocks_loop( - kura: &Kura, - mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>, - ) { + fn kura_receive_blocks_loop(kura: &Kura, shutdown_signal: &ShutdownSignal) { let (mut written_block_count, mut latest_block_hash) = { let block_data_guard = kura.block_data.lock(); (block_data_guard.len(), block_data_guard.last().map(|d| d.0)) @@ -202,7 +196,7 @@ impl Kura { let mut should_exit = false; loop { // If kura receive shutdown then close block channel and write remaining blocks to the storage - if shutdown_receiver.try_recv().is_ok() { + if shutdown_signal.is_sent() { info!("Kura block thread is being shut down. Writing remaining blocks to store."); should_exit = true; } diff --git a/core/src/lib.rs b/core/src/lib.rs index 5a3e1bb4be6..fe39a073c89 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -60,43 +60,6 @@ pub enum NetworkMessage { Health, } -pub mod handler { - //! General purpose thread handler. It is responsible for RAII for - //! threads started for Kura, Sumeragi and other core routines. - use std::thread::JoinHandle; - - /// Call shutdown function and join thread on drop - pub struct ThreadHandler { - /// Shutdown function: after calling it, the thread must terminate in finite amount of time - shutdown: Option>, - handle: Option>, - } - - impl ThreadHandler { - /// [`Self`] constructor - #[must_use] - #[inline] - pub fn new(shutdown: Box, handle: JoinHandle<()>) -> Self { - Self { - shutdown: Some(shutdown), - handle: Some(handle), - } - } - } - - impl Drop for ThreadHandler { - /// Join on drop to ensure that the thread is properly shut down. - fn drop(&mut self) { - (self.shutdown.take().expect("Always some after init"))(); - let handle = self.handle.take().expect("Always some after init"); - - if let Err(error) = handle.join() { - iroha_logger::error!(?error, "Fatal error: thread panicked"); - } - } - } -} - pub mod role { //! Module with extension for [`RoleId`] to be stored inside state. diff --git a/core/src/query/store.rs b/core/src/query/store.rs index be9da50dbd3..d6e65644c2e 100644 --- a/core/src/query/store.rs +++ b/core/src/query/store.rs @@ -11,6 +11,7 @@ use iroha_data_model::{ query::{cursor::ForwardCursor, error::QueryExecutionFail, QueryId, QueryOutputBox}, BatchedResponse, BatchedResponseV1, ValidationFail, }; +use iroha_futures::supervisor::{Child, OnShutdown}; use iroha_logger::trace; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; @@ -77,52 +78,50 @@ impl LiveQueryStore { /// Default configuration will be used. /// /// Not marked as `#[cfg(test)]` because it is used in benches as well. - pub fn test() -> Self { - Self::from_config(Config::default()) + pub fn start_test() -> LiveQueryStoreHandle { + Self::from_config(Config::default()).start().0 } /// Start [`LiveQueryStore`]. Requires a [`tokio::runtime::Runtime`] being run /// as it will create new [`tokio::task`] and detach it. /// /// Returns a handle to interact with the service. - pub fn start(mut self) -> LiveQueryStoreHandle { - const ALL_HANDLERS_DROPPED: &str = - "All handler to LiveQueryStore are dropped. Shutting down..."; - + pub fn start(mut self) -> (LiveQueryStoreHandle, Child) { let (message_sender, mut message_receiver) = mpsc::channel(1); let mut idle_interval = tokio::time::interval(self.idle_time); - tokio::task::spawn(async move { - loop { - tokio::select! { - _ = idle_interval.tick() => { - self.queries - .retain(|_, (_, last_access_time)| last_access_time.elapsed() <= self.idle_time); - }, - msg = message_receiver.recv() => { - let Some(msg) = msg else { - iroha_logger::info!("{ALL_HANDLERS_DROPPED}"); - break; - }; - - match msg { - Message::Insert(query_id, live_query) => { - self.insert(query_id, live_query) - } - Message::Remove(query_id, response_sender) => { - let live_query_opt = self.remove(&query_id); - let _ = response_sender.send(live_query_opt); + let child = Child::new( + tokio::spawn(async move { + loop { + tokio::select! { + _ = idle_interval.tick() => { + self.queries + .retain(|_, (_, last_access_time)| last_access_time.elapsed() <= self.idle_time); + }, + Some(msg) = message_receiver.recv() => { + match msg { + Message::Insert(query_id, live_query) => { + self.insert(query_id, live_query) + } + Message::Remove(query_id, response_sender) => { + let live_query_opt = self.remove(&query_id); + let _ = response_sender.send(live_query_opt); + } } } + else => { + iroha_logger::debug!("Terminating live query store"); + break; + }, } - else => break, + tokio::task::yield_now().await; } - tokio::task::yield_now().await; - } - }); + }), + OnShutdown::Abort, + ); - LiveQueryStoreHandle { message_sender } + (LiveQueryStoreHandle { message_sender }, child) } fn insert(&mut self, query_id: QueryId, live_query: LiveQuery) { @@ -257,9 +256,8 @@ mod tests { #[test] fn query_message_order_preserved() { - let query_store = LiveQueryStore::test(); let threaded_rt = tokio::runtime::Runtime::new().unwrap(); - let query_store_handle = threaded_rt.block_on(async { query_store.start() }); + let query_handle = threaded_rt.block_on(async { LiveQueryStore::start_test() }); for i in 0..10_000 { let filter = PredicateBox::default(); @@ -279,7 +277,7 @@ mod tests { .apply_postprocessing(&filter, &sorting, pagination, fetch_size) .unwrap(); - let (batch, mut cursor) = query_store_handle + let (batch, mut cursor) = query_handle .handle_query_output(query_output) .unwrap() .into(); @@ -289,7 +287,7 @@ mod tests { counter += v.len(); while cursor.cursor.is_some() { - let Ok(batched) = query_store_handle.handle_query_cursor(cursor) else { + let Ok(batched) = query_handle.handle_query_cursor(cursor) else { break; }; let (batch, new_cursor) = batched.into(); diff --git a/core/src/queue.rs b/core/src/queue.rs index 702e2f13233..9ffd636579f 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -464,7 +464,7 @@ pub mod tests { #[test] async fn push_tx() { let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let state_view = state.view(); @@ -482,7 +482,7 @@ pub mod tests { let capacity = nonzero!(10_usize); let kura: Arc = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let state_view = state.view(); @@ -517,7 +517,7 @@ pub mod tests { async fn get_available_txs() { let max_txs_in_block = 2; let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let state_view = state.view(); @@ -544,7 +544,7 @@ pub mod tests { #[test] async fn push_tx_already_in_blockchain() { let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_domains(), kura, query_handle); let (_time_handle, time_source) = TimeSource::new_mock(Duration::default()); let tx = accepted_tx_by_someone(&time_source); @@ -567,7 +567,7 @@ pub mod tests { async fn get_tx_drop_if_in_blockchain() { let max_txs_in_block = 2; let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_domains(), kura, query_handle); let (_time_handle, time_source) = TimeSource::new_mock(Duration::default()); let tx = accepted_tx_by_someone(&time_source); @@ -589,7 +589,7 @@ pub mod tests { async fn get_available_txs_with_timeout() { let max_txs_in_block = 6; let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let state_view = state.view(); @@ -638,7 +638,7 @@ pub mod tests { async fn transactions_available_after_pop() { let max_txs_in_block = 2; let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let state_view = state.view(); @@ -672,7 +672,7 @@ pub mod tests { let max_txs_in_block = 2; let (alice_id, alice_keypair) = gen_account_in("wonderland"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let state_view = state.view(); @@ -731,7 +731,7 @@ pub mod tests { async fn concurrent_stress_test() { let max_txs_in_block = 10; let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let (time_handle, time_source) = TimeSource::new_mock(Duration::default()); @@ -805,7 +805,7 @@ pub mod tests { let future_threshold = Duration::from_secs(1); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); let state_view = state.view(); @@ -850,7 +850,7 @@ pub mod tests { assert!(domain.add_account(bob_account).is_none()); World::with([domain], PeersIds::new()) }; - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world, kura, query_handle); let (_time_handle, time_source) = TimeSource::new_mock(Duration::default()); diff --git a/core/src/smartcontracts/isi/mod.rs b/core/src/smartcontracts/isi/mod.rs index 46acb4a1fc0..b8960ab8c4f 100644 --- a/core/src/smartcontracts/isi/mod.rs +++ b/core/src/smartcontracts/isi/mod.rs @@ -260,7 +260,7 @@ mod tests { fn state_with_test_domains(kura: &Arc) -> Result { let world = World::with([], PeersIds::new()); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world, kura.clone(), query_handle); let asset_definition_id = AssetDefinitionId::from_str("rose#wonderland")?; let mut state_block = state.block(); diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs index 29fd34c6c73..9b1fe495335 100644 --- a/core/src/smartcontracts/isi/query.rs +++ b/core/src/smartcontracts/isi/query.rs @@ -381,7 +381,7 @@ mod tests { let chain_id = ChainId::from("00000000-0000-0000-0000-000000000000"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_domains(), kura.clone(), query_handle); { let mut state_block = state.block(); @@ -447,7 +447,7 @@ mod tests { #[test] async fn asset_store() -> Result<()> { let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_asset_with_metadata(), kura, query_handle); let asset_definition_id = AssetDefinitionId::from_str("rose#wonderland")?; @@ -464,7 +464,7 @@ mod tests { #[test] async fn account_metadata() -> Result<()> { let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_account_with_metadata()?, kura, query_handle); let bytes = FindAccountKeyValueByIdAndKey::new(ALICE_ID.clone(), Name::from_str("Bytes")?) @@ -556,7 +556,7 @@ mod tests { let chain_id = ChainId::from("00000000-0000-0000-0000-000000000000"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_domains(), kura.clone(), query_handle); let mut state_block = state.block(); @@ -625,7 +625,7 @@ mod tests { AssetDefinition::numeric(asset_definition_id).build(&ALICE_ID) ) .is_none()); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); State::new(World::with([domain], PeersIds::new()), kura, query_handle) }; diff --git a/core/src/smartcontracts/wasm.rs b/core/src/smartcontracts/wasm.rs index 1f6d2474193..3d2b9647242 100644 --- a/core/src/smartcontracts/wasm.rs +++ b/core/src/smartcontracts/wasm.rs @@ -1773,7 +1773,7 @@ mod tests { async fn execute_instruction_exported() -> Result<(), Error> { let (authority, _authority_keypair) = gen_account_in("wonderland"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_account(&authority), kura, query_handle); let isi_hex = { @@ -1815,7 +1815,7 @@ mod tests { async fn execute_query_exported() -> Result<(), Error> { let (authority, _authority_keypair) = gen_account_in("wonderland"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_account(&authority), kura, query_handle); let query_hex = encode_hex(SmartContractQueryRequest(QueryRequest::Query( SmartContractQuery::new( @@ -1861,7 +1861,7 @@ mod tests { async fn instruction_limit_reached() -> Result<(), Error> { let (authority, _authority_keypair) = gen_account_in("wonderland"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_account(&authority), kura, query_handle); @@ -1911,7 +1911,7 @@ mod tests { async fn instructions_not_allowed() -> Result<(), Error> { let (authority, _authority_keypair) = gen_account_in("wonderland"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_account(&authority), kura, query_handle); let isi_hex = { @@ -1960,7 +1960,7 @@ mod tests { async fn queries_not_allowed() -> Result<(), Error> { let (authority, _authority_keypair) = gen_account_in("wonderland"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_account(&authority), kura, query_handle); let query_hex = encode_hex(QueryBox::from(FindAccountById::new(authority.clone()))); @@ -2002,7 +2002,7 @@ mod tests { async fn trigger_related_func_is_not_linked_for_smart_contract() -> Result<(), Error> { let (authority, _authority_keypair) = gen_account_in("wonderland"); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world_with_test_account(&authority), kura, query_handle); let query_hex = encode_hex(QueryBox::from(FindAccountById::new(authority.clone()))); diff --git a/core/src/snapshot.rs b/core/src/snapshot.rs index 572d0810c9b..03843b6e9a6 100644 --- a/core/src/snapshot.rs +++ b/core/src/snapshot.rs @@ -9,9 +9,9 @@ use std::{ use iroha_config::{base::WithOrigin, parameters::actual::Snapshot as Config, snapshot::Mode}; use iroha_crypto::HashOf; use iroha_data_model::block::SignedBlock; +use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal}; use iroha_logger::prelude::*; use serde::{de::DeserializeSeed, Serialize}; -use tokio::sync::mpsc; use crate::{ kura::{BlockCount, Kura}, @@ -27,13 +27,6 @@ const SNAPSHOT_TMP_FILE_NAME: &str = "snapshot.tmp"; // /// Errors produced by [`SnapshotMaker`] actor. // pub type Result = core::result::Result; -/// [`SnapshotMaker`] actor handle. -#[derive(Clone)] -pub struct SnapshotMakerHandle { - /// Not used to actually send messages but to signal that there is no more handles to [`SnapshotMaker`] - _message_sender: mpsc::Sender<()>, -} - /// Actor responsible for [`State`] snapshot reading and writing. pub struct SnapshotMaker { state: Arc, @@ -46,18 +39,15 @@ pub struct SnapshotMaker { } impl SnapshotMaker { - /// Start [`Self`] actor. - pub fn start(self) -> SnapshotMakerHandle { - let (message_sender, message_receiver) = mpsc::channel(1); - tokio::task::spawn(self.run(message_receiver)); - - SnapshotMakerHandle { - _message_sender: message_sender, - } + /// Start the actor. + pub fn start(self, shutdown_signal: ShutdownSignal) -> Child { + Child::new( + tokio::spawn(self.run(shutdown_signal)), + OnShutdown::Wait(Duration::from_secs(2)), + ) } - /// [`Self`] task. - async fn run(mut self, mut message_receiver: mpsc::Receiver<()>) { + async fn run(mut self, shutdown_signal: ShutdownSignal) { let mut snapshot_create_every = tokio::time::interval(self.create_every); // Don't try to create snapshot more frequently if previous take longer time snapshot_create_every.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -68,8 +58,8 @@ impl SnapshotMaker { // Offload snapshot creation into blocking thread self.create_snapshot().await; }, - _ = message_receiver.recv() => { - info!("All handler to SnapshotMaker are dropped. Saving latest snapshot and shutting down..."); + () = shutdown_signal.receive() => { + info!("Saving latest snapshot and shutting down"); self.create_snapshot().await; break; } @@ -137,7 +127,7 @@ impl SnapshotMaker { pub fn try_read_snapshot( store_dir: impl AsRef, kura: &Arc, - query_handle: LiveQueryStoreHandle, + live_query_store_lazy: impl FnOnce() -> LiveQueryStoreHandle, BlockCount(block_count): BlockCount, ) -> Result { let mut bytes = Vec::new(); @@ -157,7 +147,7 @@ pub fn try_read_snapshot( let mut deserializer = serde_json::Deserializer::from_slice(&bytes); let seed = KuraSeed { kura: Arc::clone(kura), - query_handle, + query_handle: live_query_store_lazy(), }; let state = seed.deserialize(&mut deserializer)?; let state_view = state.view(); @@ -256,7 +246,7 @@ mod tests { fn state_factory() -> State { let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); State::new( crate::queue::tests::world_with_test_domains(), kura, @@ -285,7 +275,7 @@ mod tests { let _wsv = try_read_snapshot( &store_dir, &Kura::blank_kura_for_testing(), - LiveQueryStore::test().start(), + LiveQueryStore::start_test, BlockCount(usize::try_from(state.view().height()).unwrap()), ) .unwrap(); @@ -299,7 +289,7 @@ mod tests { let Err(error) = try_read_snapshot( store_dir, &Kura::blank_kura_for_testing(), - LiveQueryStore::test().start(), + LiveQueryStore::start_test, BlockCount(15), ) else { panic!("should not be ok") @@ -321,7 +311,7 @@ mod tests { let Err(error) = try_read_snapshot( &store_dir, &Kura::blank_kura_for_testing(), - LiveQueryStore::test().start(), + LiveQueryStore::start_test, BlockCount(15), ) else { panic!("should not be ok") diff --git a/core/src/state.rs b/core/src/state.rs index 0358a0e562d..8560e1b321e 100644 --- a/core/src/state.rs +++ b/core/src/state.rs @@ -1778,7 +1778,7 @@ mod tests { const BLOCK_CNT: usize = 10; let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(World::default(), kura, query_handle); let mut state_block = state.block(); @@ -1804,7 +1804,7 @@ mod tests { const BLOCK_CNT: usize = 10; let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(World::default(), kura.clone(), query_handle); let mut state_block = state.block(); diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index a1128c7a8bb..1d402afa1d1 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -3,6 +3,7 @@ use std::sync::mpsc; use iroha_crypto::HashOf; use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId}; +use iroha_futures::supervisor::ShutdownSignal; use iroha_p2p::UpdateTopology; use tracing::{span, Level}; @@ -204,16 +205,16 @@ impl Sumeragi { &mut self, genesis_public_key: &PublicKey, state: &State, - shutdown_receiver: &mut tokio::sync::oneshot::Receiver<()>, + shutdown_signal: &ShutdownSignal, ) -> Result<(), EarlyReturn> { info!(addr = %self.peer_id.address, "Listen for genesis"); loop { std::thread::sleep(Duration::from_millis(50)); - early_return(shutdown_receiver).map_err(|e| { - debug!(?e, "Early return."); - e - })?; + if shutdown_signal.is_sent() { + info!("Shutdown signal received, shutting down Sumeragi..."); + return Err(EarlyReturn::ShutdownMessageReceived); + } match self.message_receiver.try_recv() { Ok(message) => { @@ -804,24 +805,12 @@ fn reset_state( } } -fn should_terminate(shutdown_receiver: &mut tokio::sync::oneshot::Receiver<()>) -> bool { - use tokio::sync::oneshot::error::TryRecvError; - - match shutdown_receiver.try_recv() { - Err(TryRecvError::Empty) => false, - reason => { - info!(?reason, "Sumeragi Thread is being shut down."); - true - } - } -} - #[iroha_logger::log(name = "consensus", skip_all)] /// Execute the main loop of [`Sumeragi`] pub(crate) fn run( genesis_network: GenesisWithPubKey, mut sumeragi: Sumeragi, - mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>, + shutdown_signal: &ShutdownSignal, state: Arc, ) { // Connect peers with initial topology @@ -837,7 +826,7 @@ pub(crate) fn run( if let Err(err) = sumeragi.init_listen_for_genesis( &genesis_network.public_key, &state, - &mut shutdown_receiver, + shutdown_signal, ) { info!(?err, "Sumeragi Thread is being shut down."); return; @@ -873,7 +862,7 @@ pub(crate) fn run( // Instant when the previous view change or round happened. let mut last_view_change_time = Instant::now(); - while !should_terminate(&mut shutdown_receiver) { + while !shutdown_signal.is_sent() { if should_sleep { let span = span!(Level::TRACE, "main_thread_sleep"); let _enter = span.enter(); @@ -1045,20 +1034,6 @@ enum EarlyReturn { Disconnected, } -fn early_return( - shutdown_receiver: &mut tokio::sync::oneshot::Receiver<()>, -) -> Result<(), EarlyReturn> { - use tokio::sync::oneshot::error::TryRecvError; - - match shutdown_receiver.try_recv() { - Ok(()) | Err(TryRecvError::Closed) => { - info!("Sumeragi Thread is being shut down."); - Err(EarlyReturn::ShutdownMessageReceived) - } - Err(TryRecvError::Empty) => Ok(()), - } -} - /// Strategy to apply block to sumeragi. trait ApplyBlockStrategy { const LOG_MESSAGE: &'static str; @@ -1235,7 +1210,7 @@ mod tests { assert!(domain.add_account(account).is_none()); let world = World::with([domain], topology.ordered_peers.clone()); let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); + let query_handle = LiveQueryStore::start_test(); let state = State::new(world, Arc::clone(&kura), query_handle); // Create "genesis" block diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index 35f734ce49c..0d2d6bc783e 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -11,13 +11,13 @@ use eyre::Result; use iroha_config::parameters::actual::{Common as CommonConfig, Sumeragi as SumeragiConfig}; use iroha_crypto::{KeyPair, SignatureOf}; use iroha_data_model::{block::SignedBlock, prelude::*}; +use iroha_futures::supervisor::{spawn_os_thread_as_future, Child, OnShutdown, ShutdownSignal}; use iroha_genesis::GenesisTransaction; use iroha_logger::prelude::*; use network_topology::{Role, Topology}; use crate::{ block::ValidBlock, - handler::ThreadHandler, kura::BlockCount, state::{State, StateBlock}, }; @@ -35,7 +35,6 @@ use crate::{kura::Kura, prelude::*, queue::Queue, EventsSender, IrohaNetwork, Ne pub struct SumeragiHandle { /// Counter for amount of dropped messages by sumeragi dropped_messages_metric: iroha_telemetry::metrics::DroppedMessagesCounter, - _thread_handle: Arc, // Should be dropped after `_thread_handle` to prevent sumeargi thread from panicking control_message_sender: mpsc::SyncSender, message_sender: mpsc::SyncSender, @@ -110,14 +109,16 @@ impl SumeragiHandle { Topology::recreate_topology(block.as_ref(), view_change_index, peers) }) } +} +impl SumeragiStartArgs { /// Start [`Sumeragi`] actor and return handle to it. /// /// # Panics /// May panic if something is of during initialization which is bug. #[allow(clippy::too_many_lines)] - pub fn start( - SumeragiStartArgs { + pub fn start(self, shutdown_signal: ShutdownSignal) -> (SumeragiHandle, Child) { + let Self { sumeragi_config, common_config, events_sender, @@ -132,8 +133,8 @@ impl SumeragiHandle { view_changes, dropped_messages, }, - }: SumeragiStartArgs, - ) -> SumeragiHandle { + } = self; + let (control_message_sender, control_message_receiver) = mpsc::sync_channel(100); let (message_sender, message_receiver) = mpsc::sync_channel(100); @@ -175,7 +176,7 @@ impl SumeragiHandle { for block in blocks_iter { let mut state_block = state.block(); - recreate_topology = Self::replay_block( + recreate_topology = SumeragiHandle::replay_block( &common_config.chain_id, &genesis_network.public_key, &block, @@ -217,32 +218,24 @@ impl SumeragiHandle { view_changes_metric: view_changes, }; - // Oneshot channel to allow forcefully stopping the thread. - let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel(); - - let thread_handle = { - let state = Arc::clone(&state); - std::thread::Builder::new() - .name("sumeragi thread".to_owned()) - .spawn(move || { - main_loop::run(genesis_network, sumeragi, shutdown_receiver, state); - }) - .expect("Sumeragi thread spawn should not fail.") - }; - - let shutdown = move || { - if let Err(error) = shutdown_sender.send(()) { - iroha_logger::error!(?error); - } - }; - - let thread_handle = ThreadHandler::new(Box::new(shutdown), thread_handle); - SumeragiHandle { - dropped_messages_metric: dropped_messages, - control_message_sender, - message_sender, - _thread_handle: Arc::new(thread_handle), - } + let child = Child::new( + tokio::task::spawn(spawn_os_thread_as_future( + std::thread::Builder::new().name("sumeragi".to_owned()), + move || { + main_loop::run(genesis_network, sumeragi, &shutdown_signal, state); + }, + )), + OnShutdown::Wait(Duration::from_secs(5)), + ); + + ( + SumeragiHandle { + dropped_messages_metric: dropped_messages, + control_message_sender, + message_sender, + }, + child, + ) } } diff --git a/core/test_network/Cargo.toml b/core/test_network/Cargo.toml index 410919ef7aa..15508c0da39 100644 --- a/core/test_network/Cargo.toml +++ b/core/test_network/Cargo.toml @@ -9,12 +9,13 @@ license.workspace = true [dependencies] iroha_core = { workspace = true } -irohad = { workspace = true, features = ["test-network"] } +irohad = { workspace = true } iroha = { workspace = true } iroha_crypto = { workspace = true } iroha_config = { workspace = true } iroha_data_model = { workspace = true } +iroha_futures = { workspace = true } iroha_primitives = { workspace = true } iroha_logger = { workspace = true } iroha_genesis = { workspace = true } diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 5bec6392ab5..b1ca232c60c 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -15,6 +15,7 @@ use iroha_config::parameters::actual::{Root as Config, Sumeragi, TrustedPeers}; pub use iroha_core::state::StateReadOnly; use iroha_crypto::{ExposedPrivateKey, KeyPair}; use iroha_data_model::{query::QueryOutputBox, ChainId}; +use iroha_futures::supervisor::ShutdownSignal; use iroha_genesis::{GenesisTransaction, RawGenesisTransaction}; use iroha_logger::{warn, InstrumentFutures}; use iroha_primitives::{ @@ -374,6 +375,8 @@ pub struct Peer { pub p2p_address: SocketAddr, /// The key-pair for the peer pub key_pair: KeyPair, + /// Shutdown handle + shutdown: ShutdownSignal, /// Iroha server pub irohad: Option, /// Temporary directory @@ -397,7 +400,8 @@ impl std::cmp::Eq for Peer {} impl Drop for Peer { fn drop(&mut self) { - self.stop(); + // TODO: wait for complete shutdown + self.terminate(); } } @@ -451,27 +455,35 @@ impl Peer { ); let logger = iroha_logger::test_logger(); - let (_, irohad) = Iroha::start_network(config, genesis, logger) - .instrument(info_span) + let (irohad, run_fut) = Iroha::start(config, genesis, logger, self.shutdown.clone()) .await - .expect("Failed to start Iroha"); + .expect("Iroha should start in test network"); + + let _handle = tokio::spawn( + async move { + if let Err(error) = run_fut.await { + iroha_logger::error!(?error, "Peer exited with an error"); + }; + } + .instrument(info_span), + ); self.irohad = Some(irohad); - time::sleep(Duration::from_millis(300)).await; // Prevent temporary directory deleting self.temp_dir = Some(temp_dir); } - /// Stop the peer if it's running - pub fn stop(&mut self) { - iroha_logger::info!( - p2p_addr = %self.p2p_address, - api_addr = %self.api_address, - "Stopping peer", - ); - - iroha_logger::info!("Shutting down peer..."); - self.irohad.take(); + /// Terminate the peer + // FIXME: support _complete_ forceful termination, with waiting for full abort + pub fn terminate(&mut self) { + if let Some(_irohad) = self.irohad.take() { + iroha_logger::info!( + p2p_addr = %self.p2p_address, + api_addr = %self.api_address, + "Terminating peer", + ); + self.shutdown.send(); + } } /// Creates peer @@ -486,11 +498,13 @@ impl Peer { let p2p_address = local_unique_port()?; let api_address = local_unique_port()?; let id = PeerId::new(p2p_address.clone(), key_pair.public_key().clone()); + let shutdown = ShutdownSignal::new(); Ok(Self { id, key_pair, p2p_address, api_address, + shutdown, irohad: None, temp_dir: None, }) diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 21058ecbc9d..f5a60892df2 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -23,7 +23,10 @@ iroha_logger = { workspace = true } rand = { workspace = true } serde_json = { workspace = true } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros", "signal"] } +tokio-util = { workspace = true } +thread-local-panic-hook = { version = "0.1.0" } +thiserror = { workspace = true } [dev-dependencies] tokio-stream = "0.1.15" diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 516182e08d9..98e76ff710d 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -1,4 +1,7 @@ //! Crate with various Iroha futures + +pub mod supervisor; + use std::{ future::Future, pin::Pin, diff --git a/futures/src/supervisor.rs b/futures/src/supervisor.rs new file mode 100644 index 00000000000..ab5a5b3e567 --- /dev/null +++ b/futures/src/supervisor.rs @@ -0,0 +1,843 @@ +//! Lightweight supervisor for tokio tasks. +//! +//! What it does: +//! +//! - Monitors multiple children (as spawned [`JoinHandle`]) +//! - Provides a single shutdown signal for everything +//! - Supports graceful shutdown timeout before aborting a child (via [`OnShutdown`]) +//! - If a child panics, initiates shutdown and exits with an error +//! - If a child exits before shutdown signal, also initiates shutdown and exits with an error. +//! Note: this might not be always the desirable behaviour, but _currently_ there are no other +//! cases in Iroha. +//! This behaviour could be easily extended to support refined strategies. +//! - Logs children's lifecycle +//! +//! What it doesn't: +//! +//! - Doesn't support restarting child. +//! To implement that, we need a formal actor system. + +use std::{ + ops::ControlFlow, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + time::Duration, +}; + +use iroha_logger::{prelude::Span, InstrumentFutures}; +use tokio::{ + sync::{mpsc, oneshot, watch}, + task::JoinHandle, + time::timeout, +}; +use tokio_util::sync::CancellationToken; + +/// Supervisor for tokio tasks. +#[derive(Debug)] +pub struct Supervisor { + task_tx: mpsc::Sender, + // TODO: abort on drop? + task_handle: JoinHandle>, + shutdown_signal: ShutdownSignal, + monitoring_some: Arc, +} + +#[derive(Debug)] +enum SupervisorMessage { + TaskAdded, + TaskFinished { panic: bool }, + FinishIfEmpty, +} + +struct SupervisorTask { + active_tasks: u8, + caught_panic: bool, + caught_unexpected_exit: bool, + shutdown_signal: ShutdownSignal, + rx: mpsc::Receiver, +} + +impl SupervisorTask { + fn new(rx: mpsc::Receiver, shutdown_signal: ShutdownSignal) -> Self { + Self { + active_tasks: 0, + caught_panic: false, + caught_unexpected_exit: false, + shutdown_signal, + rx, + } + } + + async fn run(mut self) -> Result<(), Error> { + loop { + tokio::select! { + Some(message) = self.rx.recv() => { + iroha_logger::trace!(?message, "Got a message"); + if let ControlFlow::Break(()) = self.handle_message(&message) { + break + } + } + else => break, + } + } + + // TODO: could report several reports. use error-stack? + if self.caught_panic { + Err(Error::ChildPanicked) + } else if self.caught_unexpected_exit { + Err(Error::UnexpectedExit) + } else { + Ok(()) + } + } + + fn handle_message(&mut self, message: &SupervisorMessage) -> ControlFlow<(), ()> { + match message { + SupervisorMessage::TaskAdded => { + self.active_tasks += 1; + ControlFlow::Continue(()) + } + SupervisorMessage::TaskFinished { panic } => { + self.active_tasks -= 1; + if *panic { + self.caught_panic = true; + if !self.shutdown_signal.is_sent() { + iroha_logger::error!("Some task panicked, shutting down everything..."); + self.shutdown_signal.send(); + } + } else if !self.shutdown_signal.is_sent() { + self.caught_unexpected_exit = true; + iroha_logger::error!( + "Some task exited unexpectedly, shutting down everything..." + ); + self.shutdown_signal.send(); + } + if self.active_tasks == 0 { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } + SupervisorMessage::FinishIfEmpty => { + if self.active_tasks == 0 { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } + } + } +} + +impl Default for Supervisor { + fn default() -> Self { + Self::new() + } +} + +// TODO: shall we + +// impl Drop for Supervisor { +// fn drop(&mut self) { +// // TODO: fire shutdown signal? +// self.task_handle.abort(); +// } +// } + +impl Supervisor { + /// Create a new supervisor. + /// + /// This must be executed within a tokio runtime. + pub fn new() -> Self { + let (task_tx, task_rx) = mpsc::channel(u8::MAX as usize); + let shutdown_signal = ShutdownSignal::new(); + + let task_handle = tokio::spawn(SupervisorTask::new(task_rx, shutdown_signal.clone()).run()); + + Self { + task_handle, + task_tx, + shutdown_signal, + monitoring_some: Arc::new(AtomicBool::new(false)), + } + } + + /// Get a copy of the supervisor's shutdown signal + pub fn shutdown_signal(&self) -> ShutdownSignal { + self.shutdown_signal.clone() + } + + /// Monitors a given [`Child`]. + /// + /// When it panics, it triggers [`Self::shutdown`] and waits until handles + /// marked with [`OnShutdown::Wait`] complete within a given timeout. When all such handles + /// complete (or timeout elapsed), it aborts all incomplete handles. + #[track_caller] + pub fn monitor(&self, child: impl Into) { + let child = child.into(); + child.span.in_scope(|| { + iroha_logger::debug!("Start monitoring a child"); + }); + + let task_tx = self.task_tx.clone(); + tokio::spawn(async move { + task_tx + .send(SupervisorMessage::TaskAdded) + .await + .expect("channel could not be closed yet"); + }); + // we need this flag because the message we've just spawned + // might not arrive before `wait_all` is called + self.monitoring_some.fetch_or(true, Ordering::Relaxed); + + let task_handle = ChildHandle::new(child, self.shutdown_signal()); + + // forward task result to the supervisor task + { + let mut task_handle = task_handle.clone(); + let sup_tx = self.task_tx.clone(); + tokio::spawn(async move { + if let Ok(task_result) = task_handle.finished().await { + let message = match task_result { + TaskResult::Ok | TaskResult::Cancel => { + SupervisorMessage::TaskFinished { panic: false } + } + TaskResult::Panic => SupervisorMessage::TaskFinished { panic: true }, + }; + let _ = sup_tx.send(message).await; + }; + }); + } + } + + /// Spawns a task that will initiate supervisor shutdown on SIGINT/SIGTERM signals. + /// # Errors + /// See [`signal::unix::signal`] errors. + pub fn setup_shutdown_on_os_signals(&self) -> Result<(), Error> { + use tokio::signal; + + let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())?; + let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?; + + let shutdown_signal = self.shutdown_signal(); + self.monitor(tokio::spawn(async move { + tokio::select! { + _ = sigint.recv() => { + iroha_logger::info!("SIGINT received, shutting down..."); + }, + _ = sigterm.recv() => { + iroha_logger::info!("SIGTERM received, shutting down..."); + }, + } + + shutdown_signal.send(); + })); + + Ok(()) + } + + /// Spawns a task that will shut down the supervisor once the external + /// [`ShutdownSignal`] is sent. + pub fn shutdown_on_external_signal(&self, external_signal: ShutdownSignal) { + let self_signal = self.shutdown_signal(); + + self.monitor(tokio::spawn(async move { + external_signal.receive().await; + self_signal.send(); + })) + } + + /// Wait until all supervised children terminate. + /// + /// Returns [`Ok`] if all children exited/aborted as expected after shutdown + /// signal being sent. + /// + /// # Errors + /// If any child panicked during execution or exited/aborted before shutdown signal being sent. + pub async fn wait_all(self) -> Result<(), Error> { + if !self.monitoring_some.load(Ordering::Relaxed) { + self.task_tx + .send(SupervisorMessage::FinishIfEmpty) + .await + .expect("channel could not be closed yet"); + } + self.task_handle + .await + .expect("supervisor task shouldn't panic") + } +} + +#[derive(Copy, Clone, Debug)] +enum TaskResult { + Ok, + Panic, + Cancel, +} + +#[derive(Clone)] +struct ChildHandle { + result_rx: watch::Receiver>, + result: Option, + abort_tx: mpsc::Sender<()>, +} + +impl ChildHandle { + fn new( + Child { + span, + handle, + on_shutdown, + }: Child, + shutdown_signal: ShutdownSignal, + ) -> Self { + let (result_tx, result_rx) = watch::channel(None); + let (abort_tx, mut abort_rx) = mpsc::channel(1); + let abort_handle = handle.abort_handle(); + + tokio::spawn( + async move { + let result = match handle.await { + Ok(()) => { + iroha_logger::debug!("Child finished OK"); + TaskResult::Ok + } + Err(err) if err.is_panic() => { + // we could use `err.into_panic()`, but it prints just `Any { .. }` + iroha_logger::error!("Child panicked"); + TaskResult::Panic + } + Err(err) if err.is_cancelled() => { + iroha_logger::debug!("Child aborted"); // oh.. + TaskResult::Cancel + } + _ => unreachable!(), + }; + + let _ = result_tx.send(Some(result)); + } + .instrument(span.clone()), + ); + + let mut result_for_abort = result_rx.clone(); + tokio::spawn(async move { + tokio::select! { + Some(()) = abort_rx.recv() => { + abort_handle.abort(); + } + _ = result_for_abort.changed() => { + // this task can exit + } + else => {} + } + }); + + let child_handle = Self { + result_rx, + result: None, + abort_tx, + }; + + let mut handle_clone = child_handle.clone(); + tokio::spawn(async move { + tokio::select! { + _ = handle_clone.wait_done() => { + // fine, exiting the task + } + () = shutdown_signal.receive() => { + match on_shutdown { + OnShutdown::Abort => { + iroha_logger::debug!("Shutdown signal received, aborting..."); + let _ = handle_clone.abort().await; + } + OnShutdown::Wait(duration) => { + iroha_logger::debug!(?duration, "Shutdown signal received, waiting for child shutdown..."); + if timeout(duration, handle_clone.wait_done()).await.is_err() { + iroha_logger::debug!(expected = ?duration, "Child shutdown took longer than expected, aborting..."); + let _ = handle_clone.abort().await; + let _ = handle_clone.wait_done().await; + } + } + } + } + } + }.instrument(span)); + + child_handle + } + + async fn wait_done(&mut self) -> Result<(), watch::error::RecvError> { + if self.result.is_some() { + return Ok(()); + } + self.result_rx.changed().await?; + self.result = *self.result_rx.borrow_and_update(); + Ok(()) + } + + async fn finished(&mut self) -> Result { + self.wait_done().await?; + Ok(self.result.expect("should be some anyway")) + } + + async fn abort(&self) -> Result<(), mpsc::error::SendError<()>> { + self.abort_tx.send(()).await + } +} + +/// Signal indicating system shutdown. Could be cloned around. +/// +/// It is effectively a wrap around [`CancellationToken`], but with different naming. +#[derive(Clone, Debug, Default)] +pub struct ShutdownSignal(CancellationToken); + +impl ShutdownSignal { + /// Constructor + pub fn new() -> Self { + Self::default() + } + + /// Send the shutdown signal, resolving all [`Self::receive`] futures. + pub fn send(&self) { + self.0.cancel(); + } + + /// Receive the shutdown signal. Resolves after [`Self::send`]. + pub async fn receive(&self) { + self.0.cancelled().await + } + + /// Sync check whether the shutdown signal was sent + pub fn is_sent(&self) -> bool { + self.0.is_cancelled() + } +} + +/// Spawn [`std::thread`] as a future that finishes when the thread finishes and panics +/// when the thread panics. +/// +/// Its intention is to link an OS thread to [`Supervisor`] in the following way: +/// +/// ``` +/// use std::time::Duration; +/// +/// use iroha_futures::supervisor::{ +/// spawn_os_thread_as_future, Child, OnShutdown, ShutdownSignal, Supervisor, +/// }; +/// +/// fn spawn_heavy_work(shutdown_signal: ShutdownSignal) -> Child { +/// Child::new( +/// tokio::spawn(spawn_os_thread_as_future( +/// std::thread::Builder::new().name("heavy_worker".to_owned()), +/// move || { +/// loop { +/// if shutdown_signal.is_sent() { +/// break; +/// } +/// // do heavy work... +/// std::thread::sleep(Duration::from_millis(100)); +/// } +/// }, +/// )), +/// OnShutdown::Wait(Duration::from(1)), +/// ) +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let supervisor = Supervisor::new(); +/// supervisor.monitor(spawn_heavy_work(supervisor.shutdown_signal())); +/// +/// let signal = supervisor.shutdown_signal(); +/// tokio::spawn(async move { +/// tokio::time::sleep(Duration::from_millis(300)).await; +/// signal.send(); +/// }); +/// +/// supervisor.wait_all().await.unwrap(); +/// } +/// ``` +/// +/// **Note:** this function doesn't provide a mechanism to shut down the thread. +/// You should handle it within the closure on your own, e.g. by passing [`ShutdownSignal`] inside. +pub async fn spawn_os_thread_as_future(builder: std::thread::Builder, f: F) +where + F: FnOnce(), + F: Send + 'static, +{ + let (ok_tx, ok_rx) = oneshot::channel(); + let (err_tx, err_rx) = oneshot::channel(); + + // FIXME we cannot just _move_ `err_tx` inside of the thread's panic hook + let err_tx = RwLock::new(Some(err_tx)); + + // we are okay to drop the handle; thread will continue running in a detached way + let _handle: std::thread::JoinHandle<_> = builder + .spawn(move || { + let default_hook = thread_local_panic_hook::take_hook(); + thread_local_panic_hook::set_hook(Box::new(move |info| { + // the receiver might be dropped + let _ = err_tx + .write() + .expect("no one else should lock this sender") + .take() + .expect("should be taken only once, on hook trigger") + .send(()); + // TODO: need to print info in a custom way? + default_hook(info); + })); + + f(); + + // the receiver might be dropped + let _ = ok_tx.send(()); + }) + .expect("should spawn thread normally"); + + tokio::select! { + _ = ok_rx => { + // fine, do nothing + } + _ = err_rx => { + panic!("thread panicked"); + } + } +} + +/// Supervisor child. +pub struct Child { + span: Span, + handle: JoinHandle<()>, + on_shutdown: OnShutdown, +} + +impl Child { + /// Create a new supervisor child + #[track_caller] + pub fn new(handle: JoinHandle<()>, on_shutdown: OnShutdown) -> Self { + let caller_location = std::panic::Location::caller().to_string(); + let span = iroha_logger::debug_span!("supervisor_child_monitor", %caller_location); + + Self { + span, + handle, + on_shutdown, + } + } +} + +impl From> for Child { + #[track_caller] + fn from(value: JoinHandle<()>) -> Self { + Self::new(value, OnShutdown::Abort) + } +} + +/// Supervisor errors +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error("Some of the supervisor children panicked")] + ChildPanicked, + #[error("Some of the supervisor children exited unexpectedly")] + UnexpectedExit, + #[error("IO error")] + IO(#[from] std::io::Error), +} + +/// Specifies supervisor action regarding a [`Child`] when shutdown happens. +#[derive(Default, Copy, Clone)] +pub enum OnShutdown { + /// Abort the child immediately + #[default] + Abort, + /// Wait until the child exits/aborts on its own; abort if it takes too long + Wait(Duration), +} + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }; + + use tokio::{ + sync::{mpsc, oneshot}, + time::sleep, + }; + + use super::*; + + const TICK_TIMEOUT: Duration = Duration::from_millis(10); + /// For some reason, when all tests are run simultaneously, tests with OS spawns take longer + /// than just [`TICK_TIMEOUT`] + const OS_THREAD_SPAWN_TICK: Duration = Duration::from_millis(500); + const SHUTDOWN_WITHIN_TICK: OnShutdown = OnShutdown::Wait(TICK_TIMEOUT); + + #[tokio::test] + async fn empty_supervisor_just_exits() { + let sup = Supervisor::new(); + timeout(TICK_TIMEOUT, sup.wait_all()) + .await + .expect("should exit immediately") + .expect("should not emit error"); + } + + #[tokio::test] + async fn happy_graceful_shutdown() { + #[derive(Debug)] + enum Message { + Ping { pong: oneshot::Sender<()> }, + Stopped, + } + + let sup = Supervisor::new(); + + let (tx_into, mut rx_into) = mpsc::channel(1); + let (tx_out, rx_out) = oneshot::channel(); + + { + let shutdown = sup.shutdown_signal(); + sup.monitor(Child::new( + tokio::spawn(async move { + loop { + tokio::select! { + Some(Message::Ping { pong }) = rx_into.recv() => { + pong.send(()).unwrap(); + }, + () = shutdown.receive() => { + tx_out.send(Message::Stopped).unwrap(); + break; + } + } + } + }), + SHUTDOWN_WITHIN_TICK, + )); + } + + // ensure task is spinning + timeout(TICK_TIMEOUT, async { + let (tx, rx) = oneshot::channel(); + tx_into.send(Message::Ping { pong: tx }).await.unwrap(); + rx.await.unwrap(); + }) + .await + .unwrap(); + + let shutdown = sup.shutdown_signal(); + let sup_handle = tokio::spawn(sup.wait_all()); + + // send shutdown signal + shutdown.send(); + timeout(TICK_TIMEOUT, async { + let Message::Stopped = rx_out.await.unwrap() else { + panic!("expected stopped message"); + }; + }) + .await + .unwrap(); + + // we can now expect supervisor to stop without errors + timeout(TICK_TIMEOUT, sup_handle) + .await + .unwrap() + .expect("supervisor run should not panic") + .expect("supervisor should not find any nested panics"); + } + + #[tokio::test] + async fn supervisor_catches_panic_of_a_monitored_task() { + let sup = Supervisor::new(); + + sup.monitor(tokio::spawn(async { + panic!("my panic should not be unnoticed") + })); + + let Error::ChildPanicked = timeout(TICK_TIMEOUT, sup.wait_all()) + .await + .unwrap() + .expect_err("should catch the panic") + else { + panic!("other errors aren't expected") + }; + } + + #[tokio::test] + async fn supervisor_sends_shutdown_when_some_task_exits() { + let sup = Supervisor::new(); + + // exits immediately, not expected + sup.monitor(tokio::spawn(async {})); + + // some task that needs shutdown gracefully + let signal = sup.shutdown_signal(); + let (graceful_tx, graceful_rx) = oneshot::channel(); + sup.monitor(Child::new( + tokio::spawn(async move { + signal.receive().await; + graceful_tx.send(()).unwrap(); + }), + SHUTDOWN_WITHIN_TICK, + )); + + let sup_handle = tokio::spawn(sup.wait_all()); + + timeout(TICK_TIMEOUT, graceful_rx) + .await + .expect("should shutdown everything immediately") + .expect("should receive message fine"); + + let Error::UnexpectedExit = timeout(TICK_TIMEOUT, sup_handle) + .await + .unwrap() + .expect("supervisor should not panic") + .expect_err("should handle unexpected exit") + else { + panic!("other errors aren't expected") + }; + } + + #[tokio::test] + async fn graceful_shutdown_when_some_task_panics() { + let sup = Supervisor::new(); + + let signal = sup.shutdown_signal(); + sup.monitor(tokio::spawn(async { panic!() })); + + let Error::ChildPanicked = timeout(TICK_TIMEOUT, sup.wait_all()) + .await + .unwrap() + .expect_err("should catch the panic") + else { + panic!("other errors aren't expected") + }; + + assert!(signal.is_sent()) + } + + fn spawn_task_with_graceful_shutdown( + sup: &Supervisor, + shutdown_time: Duration, + timeout: Duration, + ) -> Arc { + let graceful = Arc::new(AtomicBool::new(false)); + + let signal = sup.shutdown_signal(); + let graceful_clone = graceful.clone(); + sup.monitor(Child::new( + tokio::spawn(async move { + signal.receive().await; + sleep(shutdown_time).await; + graceful_clone.fetch_or(true, Ordering::Relaxed); + }), + OnShutdown::Wait(timeout), + )); + + graceful + } + + #[tokio::test] + async fn actually_waits_for_shutdown() { + const ACTUAL_SHUTDOWN: Duration = Duration::from_millis(50); + const TIMEOUT: Duration = Duration::from_millis(100); + + let sup = Supervisor::new(); + let signal = sup.shutdown_signal(); + let graceful = spawn_task_with_graceful_shutdown(&sup, ACTUAL_SHUTDOWN, TIMEOUT); + let sup_fut = tokio::spawn(sup.wait_all()); + + signal.send(); + timeout(ACTUAL_SHUTDOWN + TICK_TIMEOUT, sup_fut) + .await + .expect("should finish within this time") + .expect("supervisor should not panic") + .expect("supervisor should exit fine"); + assert!(graceful.load(Ordering::Relaxed)); + } + + #[tokio::test] + async fn aborts_task_if_shutdown_takes_long() { + const ACTUAL_SHUTDOWN: Duration = Duration::from_millis(100); + const TIMEOUT: Duration = Duration::from_millis(50); + + // Start system + let sup = Supervisor::new(); + let signal = sup.shutdown_signal(); + let graceful = spawn_task_with_graceful_shutdown(&sup, ACTUAL_SHUTDOWN, TIMEOUT); + let sup_fut = tokio::spawn(sup.wait_all()); + + // Initiate shutdown + signal.send(); + timeout(TIMEOUT + TICK_TIMEOUT, sup_fut) + .await + .expect("should finish within this time") + .expect("supervisor should not panic") + .expect("shutdown took too long, but it is not an error"); + assert!(!graceful.load(Ordering::Relaxed)); + } + + #[tokio::test] + async fn can_monitor_os_thread_shutdown() { + const LOOP_SLEEP: Duration = Duration::from_millis(5); + const TIMEOUT: Duration = Duration::from_millis(50); + + let sup = Supervisor::new(); + let signal = sup.shutdown_signal(); + let signal2 = sup.shutdown_signal(); + let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1); + let graceful = Arc::new(AtomicBool::new(false)); + let graceful2 = graceful.clone(); + sup.monitor(Child::new( + tokio::spawn(spawn_os_thread_as_future( + std::thread::Builder::new(), + move || { + // FIXME ready state + iroha_logger::info!("sending message"); + ready_tx.send(()).unwrap(); + iroha_logger::info!("done sending"); + loop { + if signal.is_sent() { + graceful.fetch_or(true, Ordering::Relaxed); + break; + } + std::thread::sleep(LOOP_SLEEP); + } + }, + )), + OnShutdown::Wait(TIMEOUT), + )); + // need to yield so that it can actually start the thread + tokio::task::yield_now().await; + let sup_fut = tokio::spawn(sup.wait_all()); + + ready_rx + .recv_timeout(OS_THREAD_SPAWN_TICK) + .expect("thread should start by now"); + signal2.send(); + timeout(TICK_TIMEOUT, sup_fut) + .await + .expect("should shutdown within timeout") + .expect("should not panic") + .expect("should shutdown without errors"); + assert!(graceful2.load(Ordering::Relaxed)); + } + + #[tokio::test] + async fn can_catch_os_thread_panic() { + let sup = Supervisor::new(); + sup.monitor(tokio::spawn(spawn_os_thread_as_future( + std::thread::Builder::new(), + || panic!("oops"), + ))); + let Error::ChildPanicked = timeout(OS_THREAD_SPAWN_TICK, sup.wait_all()) + .await + .expect("should terminate immediately") + .expect_err("should catch panic") + else { + panic!("no other error expected"); + }; + } +} diff --git a/logger/src/lib.rs b/logger/src/lib.rs index 35fd022889d..057c079eab4 100644 --- a/logger/src/lib.rs +++ b/logger/src/lib.rs @@ -5,6 +5,7 @@ pub mod telemetry; use std::{ fmt::Debug, + str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, OnceLock, @@ -87,6 +88,8 @@ pub fn init_global(config: InitConfig) -> Result { /// Returns once lazily initialised global logger for testing purposes. /// +/// Log level may be modified via `TEST_LOG_LEVEL` environment variable +/// /// # Panics /// If [`init_global`] or [`disable_global`] were called first. pub fn test_logger() -> LoggerHandle { @@ -100,7 +103,10 @@ pub fn test_logger() -> LoggerHandle { // `test_logger` simple and also will emphasise isolation which is necessary anyway in // case of singleton mocking (where the logger is the singleton). let config = Config { - level: Level::DEBUG, + level: std::env::var("TEST_LOG_LEVEL") + .ok() + .and_then(|raw| Level::from_str(&raw).ok()) + .unwrap_or(Level::DEBUG), format: Format::Pretty, }; diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index a77e8c28f93..8d8d2a97f7d 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -18,6 +18,7 @@ iroha_data_model = { workspace = true, default-features = true, features = ["tra iroha_primitives = { workspace = true } iroha_config = { workspace = true } iroha_data_model_derive = { workspace = true } +iroha_futures = { workspace = true } rand = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "io-util", "net", "time"] } diff --git a/p2p/src/network.rs b/p2p/src/network.rs index a362baa5223..e5f808ae805 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -10,6 +10,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use iroha_config::parameters::actual::Network as Config; use iroha_crypto::{KeyPair, PublicKey}; use iroha_data_model::prelude::PeerId; +use iroha_futures::supervisor::{Child, OnShutdown}; use iroha_logger::prelude::*; use iroha_primitives::addr::SocketAddr; use parity_scale_codec::Encode as _; @@ -75,7 +76,7 @@ impl NetworkBaseHandle { address: listen_addr, idle_timeout, }: Config, - ) -> Result { + ) -> Result<(Self, Child), Error> { // TODO: enhance the error by reporting the origin of `listen_addr` let listener = TcpListener::bind(listen_addr.value().to_socket_addrs()?.as_slice()).await?; iroha_logger::info!("Network bound to listener"); @@ -108,15 +109,18 @@ impl NetworkBaseHandle { _key_exchange: core::marker::PhantomData::, _encryptor: core::marker::PhantomData::, }; - tokio::task::spawn(network.run()); - Ok(Self { - subscribe_to_peers_messages_sender, - online_peers_receiver, - update_topology_sender, - network_message_sender, - _key_exchange: core::marker::PhantomData, - _encryptor: core::marker::PhantomData, - }) + let child = Child::new(tokio::task::spawn(network.run()), OnShutdown::Abort); + Ok(( + Self { + subscribe_to_peers_messages_sender, + online_peers_receiver, + update_topology_sender, + network_message_sender, + _key_exchange: core::marker::PhantomData, + _encryptor: core::marker::PhantomData, + }, + child, + )) } /// Subscribe to messages received from other peers in the network diff --git a/p2p/tests/integration/p2p.rs b/p2p/tests/integration/p2p.rs index b73c06f0117..39ef5615506 100644 --- a/p2p/tests/integration/p2p.rs +++ b/p2p/tests/integration/p2p.rs @@ -45,7 +45,7 @@ async fn network_create() { address: WithOrigin::inline(address.clone()), idle_timeout, }; - let network = NetworkHandle::start(key_pair, config).await.unwrap(); + let (network, _) = NetworkHandle::start(key_pair, config).await.unwrap(); tokio::time::sleep(delay).await; info!("Connecting to peer..."); @@ -156,7 +156,7 @@ async fn two_networks() { address: WithOrigin::inline(address1.clone()), idle_timeout, }; - let mut network1 = NetworkHandle::start(key_pair1, config1).await.unwrap(); + let (mut network1, _) = NetworkHandle::start(key_pair1, config1).await.unwrap(); info!("Starting second network..."); let address2 = socket_addr!(127.0.0.1:12_010); @@ -164,7 +164,7 @@ async fn two_networks() { address: WithOrigin::inline(address2.clone()), idle_timeout, }; - let network2 = NetworkHandle::start(key_pair2, config2).await.unwrap(); + let (network2, _) = NetworkHandle::start(key_pair2, config2).await.unwrap(); let mut messages2 = WaitForN::new(1); let actor2 = TestActor::start(messages2.clone()); @@ -302,7 +302,7 @@ async fn start_network( address: WithOrigin::inline(address), idle_timeout, }; - let mut network = NetworkHandle::start(key_pair, config).await.unwrap(); + let (mut network, _) = NetworkHandle::start(key_pair, config).await.unwrap(); network.subscribe_to_peers_messages(actor); let _ = barrier.wait().await; diff --git a/telemetry/src/dev.rs b/telemetry/src/dev.rs index 1ca1511f3a0..881bbf1b811 100644 --- a/telemetry/src/dev.rs +++ b/telemetry/src/dev.rs @@ -47,8 +47,6 @@ pub async fn start_file_output( ) })?; - // Serde doesn't support async Read Write traits. - // So let synchronous code be here. let join_handle = task::spawn(async move { while let Some(item) = stream.next().await { if let Err(error) = write_telemetry(&mut file, &item).await { @@ -61,8 +59,11 @@ pub async fn start_file_output( } async fn write_telemetry(file: &mut File, item: &FuturePollTelemetry) -> Result<()> { + // Serde doesn't support async Read Write traits. + // So let synchronous code be here. let mut json = serde_json::to_string(&item).wrap_err("failed to serialize telemetry to JSON")?; + json.push('\n'); file.write_all(json.as_bytes()) .await diff --git a/torii/src/lib.rs b/torii/src/lib.rs index a987608ee3f..e02b34dbf62 100644 --- a/torii/src/lib.rs +++ b/torii/src/lib.rs @@ -10,9 +10,9 @@ use std::{ fmt::{Debug, Write as _}, net::ToSocketAddrs, sync::Arc, + time::Duration, }; -use futures::{stream::FuturesUnordered, StreamExt}; use iroha_config::parameters::actual::Torii as Config; #[cfg(feature = "telemetry")] use iroha_core::metrics::MetricsReporter; @@ -26,9 +26,9 @@ use iroha_core::{ EventsSender, }; use iroha_data_model::ChainId; +use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal}; use iroha_primitives::addr::SocketAddr; use iroha_torii_const::uri; -use tokio::{sync::Notify, task}; use utils::*; use warp::{ http::StatusCode, @@ -49,7 +49,7 @@ pub struct Torii { kiso: KisoHandle, queue: Arc, events: EventsSender, - notify_shutdown: Arc, + // shutdown_cancel_token: CancellationToken, query_service: LiveQueryStoreHandle, kura: Arc, transaction_max_content_length: u64, @@ -68,7 +68,7 @@ impl Torii { config: Config, queue: Arc, events: EventsSender, - notify_shutdown: Arc, + // shutdown_cancel_token: CancellationToken, query_service: LiveQueryStoreHandle, kura: Arc, state: Arc, @@ -79,7 +79,7 @@ impl Torii { kiso, queue, events, - notify_shutdown, + // shutdown_cancel_token, query_service, kura, state, @@ -236,57 +236,34 @@ impl Torii { .with(warp::trace::request())) } - /// Start main API endpoints. + /// To handle incoming requests `Torii` should be started first. /// /// # Errors /// Can fail due to listening to network or if http server fails - fn start_api(self: Arc) -> eyre::Result>> { - let torii_address = &self.address; - - let handles = torii_address + // #[iroha_futures::telemetry_future] // FIXME + pub fn start(self, shutdown_signal: &ShutdownSignal) -> eyre::Result> { + let children = self + .address .to_socket_addrs()? .map(|addr| { - let torii = Arc::clone(&self); + let api_router = self.create_api_router(); + let shutdown_signal = shutdown_signal.clone(); - let api_router = torii.create_api_router(); - let signal_fut = async move { torii.notify_shutdown.notified().await }; // FIXME: warp panics if fails to bind! // handle this properly, report address origin after Axum // migration: https://github.com/hyperledger/iroha/issues/3776 let (_, serve_fut) = - warp::serve(api_router).bind_with_graceful_shutdown(addr, signal_fut); - - task::spawn(serve_fut) + warp::serve(api_router).bind_with_graceful_shutdown(addr, async move { + shutdown_signal.receive().await + }); + Child::new( + tokio::spawn(serve_fut), + OnShutdown::Wait(Duration::from_secs(5)), + ) }) .collect(); - Ok(handles) - } - - /// To handle incoming requests `Torii` should be started first. - /// - /// # Errors - /// Can fail due to listening to network or if http server fails - #[iroha_futures::telemetry_future] - pub async fn start(self) -> eyre::Result<()> { - let torii = Arc::new(self); - let mut handles = vec![]; - - handles.extend(Arc::clone(&torii).start_api()?); - - handles - .into_iter() - .collect::>() - .for_each(|handle| { - if let Err(error) = handle { - iroha_logger::error!(%error, "Join handle error"); - } - - futures::future::ready(()) - }) - .await; - - Ok(()) + Ok(children) } }