From 3103194ec4fadae337ae5c724d0739440836435c Mon Sep 17 00:00:00 2001 From: dr7ana Date: Wed, 25 Sep 2024 09:31:51 -0700 Subject: [PATCH] Better linux fd poller --- llarp/address/ip_packet.cpp | 5 +++ llarp/address/ip_packet.hpp | 2 ++ llarp/config/config.cpp | 2 +- llarp/crypto/key_manager.cpp | 1 - llarp/ev/loop.cpp | 10 +++--- llarp/ev/loop.hpp | 3 +- llarp/ev/types.cpp | 34 +++++++++++-------- llarp/ev/types.hpp | 65 +++++++++++++++++++++--------------- llarp/handlers/session.cpp | 11 +++--- llarp/handlers/tun.cpp | 65 +++++++++++++++++++----------------- llarp/handlers/tun.hpp | 4 ++- llarp/link/link_manager.cpp | 4 ++- llarp/path/path.cpp | 4 +-- llarp/path/path_handler.cpp | 2 +- llarp/router/router.cpp | 7 ++-- llarp/router/router.hpp | 4 +-- llarp/vpn/linux.hpp | 8 +++-- llarp/vpn/packet_router.cpp | 12 +++---- llarp/vpn/platform.hpp | 2 +- 19 files changed, 145 insertions(+), 100 deletions(-) diff --git a/llarp/address/ip_packet.cpp b/llarp/address/ip_packet.cpp index c283d99c67..3d1ebbccb6 100644 --- a/llarp/address/ip_packet.cpp +++ b/llarp/address/ip_packet.cpp @@ -374,4 +374,9 @@ namespace llarp return {reinterpret_cast(data()), size()}; } + std::string IPPacket::info_line() const + { + return "IPPacket (src:{}, dest:{}, size:{})"_format(_src_addr, _dst_addr, size()); + } + } // namespace llarp diff --git a/llarp/address/ip_packet.hpp b/llarp/address/ip_packet.hpp index b4d4d66d1c..3c49ef8819 100644 --- a/llarp/address/ip_packet.hpp +++ b/llarp/address/ip_packet.hpp @@ -135,6 +135,8 @@ namespace llarp ustring_view uview() const { return {data(), size()}; } std::string to_string(); + + std::string info_line() const; }; } // namespace llarp diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index c25b9c3931..74527912d8 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -1083,7 +1083,7 @@ namespace llarp if (maybe and maybe->is_loopback()) throw std::invalid_argument{"{} is a loopback address"_format(arg)}; - log::critical(logcat, "parsed address: ", *maybe); + log::critical(logcat, "parsed address: {}", *maybe); return maybe; }; diff --git a/llarp/crypto/key_manager.cpp b/llarp/crypto/key_manager.cpp index 5a4656f93d..5655e51360 100644 --- a/llarp/crypto/key_manager.cpp +++ b/llarp/crypto/key_manager.cpp @@ -24,7 +24,6 @@ namespace llarp bool KeyManager::_initialize(const Config& config, bool is_relay) { - logcat->set_level(log::Level::trace); // TESTNET: if (is_initialized) return false; diff --git a/llarp/ev/loop.cpp b/llarp/ev/loop.cpp index 5c4537e48e..723387c039 100644 --- a/llarp/ev/loop.cpp +++ b/llarp/ev/loop.cpp @@ -19,17 +19,19 @@ namespace llarp log::info(logcat, "lokinet loop shut down {}", _close_immediately ? "immediately" : "gracefully"); } - bool EventLoop::add_network_interface(std::shared_ptr netif, ip_pkt_hook handler) + std::shared_ptr EventLoop::add_network_interface( + std::shared_ptr netif, std::function hook) { (void)netif; - (void)handler; + (void)hook; + std::shared_ptr _p; #ifdef __linux__ - // + _p = _loop->template make_shared(netif->PollFD(), _loop->loop(), std::move(hook)); #else // #endif - return true; + return _p; } void EventLoop::stop(bool) diff --git a/llarp/ev/loop.hpp b/llarp/ev/loop.hpp index 1f13da872d..c9b1212acb 100644 --- a/llarp/ev/loop.hpp +++ b/llarp/ev/loop.hpp @@ -35,7 +35,8 @@ namespace llarp bool in_event_loop() const { return _loop->in_event_loop(); } - bool add_network_interface(std::shared_ptr netif, ip_pkt_hook handler); + std::shared_ptr add_network_interface( + std::shared_ptr netif, std::function hook); template void call(Callable&& f) diff --git a/llarp/ev/types.cpp b/llarp/ev/types.cpp index b6b859beab..af5ff1be02 100644 --- a/llarp/ev/types.cpp +++ b/llarp/ev/types.cpp @@ -169,19 +169,17 @@ namespace llarp log::trace(logcat, "Cooldown {}successfully began after {} attempts!", _is_cooling_down ? "" : "un", _current); } - std::shared_ptr EventPoller::make(const std::shared_ptr& _loop, std::function task) + LinuxPoller::LinuxPoller(int _fd, const loop_ptr& _loop, std::function task) + : FDPoller{_fd, std::move(task)} { - return _loop->template make_shared(_loop->loop(), std::move(task)); - } - - EventPoller::EventPoller(const loop_ptr& _loop, std::function task) : f{std::move(task)} - { - pv.reset(evwatch_prepare_new( + ev.reset(event_new( _loop.get(), - [](struct evwatch*, const struct evwatch_prepare_cb_info*, void* w) { + fd, + EV_READ | EV_PERSIST, + [](evutil_socket_t, short, void* s) { try { - auto* self = reinterpret_cast(w); + auto* self = reinterpret_cast(s); assert(self); if (not self->f) @@ -194,15 +192,25 @@ namespace llarp } catch (const std::exception& e) { - log::critical(logcat, "EventPoller caugh exception: {}", e.what()); + log::critical(logcat, "EventPoller caught exception: {}", e.what()); } }, this)); + + log::debug(logcat, "Linux poller configured to watch FD: {}", fd); } - EventPoller::~EventPoller() + bool LinuxPoller::start() { - pv.reset(); - f = nullptr; + auto rv = event_add(ev.get(), nullptr) == 0; + log::info(logcat, "Linux poller {} watching FD: {}", rv ? "successfully began" : "failed to start", fd); + return rv; + } + + bool LinuxPoller::stop() + { + auto rv = event_del(ev.get()); + log::info(logcat, "Linux poller {} watching FD: {}", rv ? "successfully stopped" : "failed to stop", fd); + return rv; } } // namespace llarp diff --git a/llarp/ev/types.hpp b/llarp/ev/types.hpp index c12f0387e7..6638a063ef 100644 --- a/llarp/ev/types.hpp +++ b/llarp/ev/types.hpp @@ -19,13 +19,6 @@ namespace llarp // shared_ptr containing the actual libev loop using loop_ptr = std::shared_ptr<::event_base>; - inline constexpr auto watch_deleter = [](::evwatch* w) { - if (w) - ::evwatch_free(w); - }; - - using watch_ptr = std::unique_ptr<::evwatch, decltype(watch_deleter)>; - /** EventTrigger This class is a parallel implementation of libquic Ticker (typedef'ed as 'EventTicker' above). Rather than invoking at regular intervals, it is manually invoked with an optional time delay. This is a useful @@ -106,35 +99,55 @@ namespace llarp bool is_cooling_down() const { return _is_cooling_down; } }; - /** EventPoller - This class is a similar implementation to EventTrigger and Ticker, with a few key differences in relation to - the net interfaces it manages. First, we don't want the execution of the logic to be tied to a specific timer or - fixed interval. Rather, this will be event triggered on packet I/O. As a result, this necessitates the second - key difference: it uses a libevent "prepare" watcher to fire immediately BEFORE polling for I/O. Libevent also - exposes the concept of a "check" watcher, which fires immediately AFTER processing active events. + /** FDPoller + This class is the base for the platform-specific Pollers that watch for IO on the virtual TUN network + interface. */ - struct EventPoller + struct FDPoller { friend class oxen::quic::Loop; - private: - EventPoller(const loop_ptr& _loop, std::function task); + // No move/copy/etc + FDPoller() = delete; + FDPoller(const FDPoller&) = delete; + FDPoller(FDPoller&&) = delete; + FDPoller& operator=(const FDPoller&) = delete; + FDPoller& operator=(FDPoller&&) = delete; + + virtual ~FDPoller() + { + ev.reset(); + f = nullptr; + } + + protected: + FDPoller(int _fd, std::function task) : fd{_fd}, f{std::move(task)} {} + + int fd; + event_ptr ev; + std::function f; public: - static std::shared_ptr make(const std::shared_ptr& _loop, std::function task); + virtual bool start() = 0; - // No move/copy/etc - EventPoller() = delete; - EventPoller(const EventTrigger&) = delete; - EventPoller(EventPoller&&) = delete; - EventPoller& operator=(const EventPoller&) = delete; - EventPoller& operator=(EventPoller&&) = delete; + virtual bool stop() = 0; + }; - ~EventPoller(); + /** LinuxPoller + This class is a linux-specific extension of the Base poller type. + */ + struct LinuxPoller final : public FDPoller + { + friend class EventLoop; + friend class oxen::quic::Loop; private: - watch_ptr pv; - std::function f; + LinuxPoller(int _fd, const loop_ptr& _loop, std::function task); + + public: + bool start() override; + + bool stop() override; }; } // namespace llarp diff --git a/llarp/handlers/session.cpp b/llarp/handlers/session.cpp index 54c4a6f61a..fe925a6f54 100644 --- a/llarp/handlers/session.cpp +++ b/llarp/handlers/session.cpp @@ -392,6 +392,8 @@ namespace llarp::handlers else { // TODO: if this fails, we should close the session + log::warning(logcat, "TUN devcice failed to route session (remote: {}) to local ip", session->remote()); + ret = false; } } else @@ -445,7 +447,7 @@ namespace llarp::handlers path->pivot_txid(), fetch_auth_token(remote), _router.using_tun_if()), - [this, remote, tag, path, hook = std::move(cb), is_exit](std::string response) { + [this, remote, tag, path, hook = std::move(cb), is_exit](std::string response) mutable { if (response == messages::OK_RESPONSE) { auto outbound = std::make_shared( @@ -525,7 +527,7 @@ namespace llarp::handlers if (not build3( path->upstream_rid(), std::move(payload), - [this, path, intros, remote, hook = std::move(cb), is_exit](oxen::quic::message m) { + [this, path, intros, remote, hook = std::move(cb), is_exit](oxen::quic::message m) mutable { if (m) { // Do not call ::add_path() or ::path_build_succeeded() here; OutboundSession constructor will @@ -570,12 +572,13 @@ namespace llarp::handlers auto counter = std::make_shared(path::DEFAULT_PATHS_HELD); - _router.loop()->call([this, remote, handler = std::move(cb), is_exit, counter]() { + _router.loop()->call([this, remote, handler = std::move(cb), is_exit, counter]() mutable { lookup_intro( remote.router_id(), false, 0, - [this, remote, hook = std::move(handler), is_exit, counter](std::optional intro) { + [this, remote, hook = std::move(handler), is_exit, counter]( + std::optional intro) mutable { // already have a successful return if (*counter == 0) return; diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index cfa1447297..e4b8a34ae9 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -340,6 +340,9 @@ namespace llarp::handlers } } + log::debug(logcat, "Tun constructing IPRange iterator on local range: {}", _local_range); + _local_range_iterator = IPRangeIterator(_local_range); + _local_netaddr = NetworkAddress::from_pubkey(_router.local_rid(), not _router.is_service_node()); _local_ip_mapping.insert_or_assign(_local_base_ip, std::move(_local_netaddr)); @@ -356,34 +359,6 @@ namespace llarp::handlers log::debug(logcat, "{} setting up network...", name()); - _net_if = router().vpn_platform()->CreateInterface(std::move(info), &_router); - - _if_name = _net_if->interface_info().ifname; - log::info(logcat, "{} got network interface:{}", name(), _if_name); - - auto if_hook = [netif = _net_if, pktrouter = _packet_router]() mutable { - auto pkt = netif->read_next_packet(); - - while (not pkt.empty()) - { - pktrouter->handle_ip_packet(std::move(pkt)); - pkt = netif->read_next_packet(); - } - }; - - auto handle_packet = [netif = _net_if, pktrouter = _packet_router](IPPacket pkt) { - // TODO: packets used to have reply hooks - // pkt.reply = [netif](auto pkt) { netif->write_packet(std::move(pkt)); }; - pktrouter->handle_ip_packet(std::move(pkt)); - }; - - if (not router().loop()->add_network_interface(_net_if, std::move(handle_packet))) - { - auto err = "{} failed to add network interface!"_format(name()); - log::error(logcat, "{}", err); - throw std::runtime_error{std::move(err)}; - } - _local_ipv6 = ipv6_enabled ? _local_addr : _local_addr.mapped_ipv4_as_ipv6(); if (ipv6_enabled) @@ -400,6 +375,26 @@ namespace llarp::handlers log::info( logcat, "{} has interface ipv4 address ({}) with ipv6 address ({})", name(), _local_addr, _local_ipv6); + _net_if = router().vpn_platform()->create_interface(std::move(info), &_router); + _if_name = _net_if->interface_info().ifname; + + log::info(logcat, "{} got network interface:{}", name(), _if_name); + + auto pkt_hook = [this]() { + for (auto pkt = _net_if->read_next_packet(); not pkt.empty(); pkt = _net_if->read_next_packet()) + { + log::debug(logcat, "packet router receiving {}", pkt.info_line()); + _packet_router->handle_ip_packet(std::move(pkt)); + } + }; + + if (_poller = router().loop()->add_network_interface(_net_if, std::move(pkt_hook)); not _poller) + { + auto err = "{} failed to add network interface!"_format(name()); + log::error(logcat, "{}", err); + throw std::runtime_error{std::move(err)}; + } + // if (auto* quic = GetQUICTunnel()) // { // TODO: @@ -1015,11 +1010,12 @@ namespace llarp::handlers { log::debug(logcat, "Dispatching outbound packet for session (remote: {})", remote); session->send_path_data_message(std::move(pkt).steal_payload()); - return; } - throw std::runtime_error{"Could not find session (remote: {}) for outbound packet!"_format(remote)}; + else + log::warning(logcat, "Could not find session (remote: {}) for outbound packet!", remote); } - throw std::runtime_error{"Could not find remote mapped to local ip: {}"_format(dest)}; + else + log::debug(logcat, "Could not find remote for route {}", pkt.info_line()); } bool TunEndpoint::obtain_src_for_remote(const NetworkAddress& remote, ip_v& src, bool use_ipv4) @@ -1145,6 +1141,13 @@ namespace llarp::handlers return true; } + void TunEndpoint::start_poller() + { + if (not _poller->start()) + throw std::runtime_error{"TUN failed to start FD poller!"}; + log::debug(logcat, "TUN successfully started FD poller!"); + } + bool TunEndpoint::is_allowing_traffic(const IPPacket& pkt) const { if (auto exitPolicy = get_traffic_policy()) diff --git a/llarp/handlers/tun.hpp b/llarp/handlers/tun.hpp index e9a07bc0c0..e64256bae5 100644 --- a/llarp/handlers/tun.hpp +++ b/llarp/handlers/tun.hpp @@ -54,7 +54,7 @@ namespace llarp::handlers std::optional _base_ipv6_range = std::nullopt; std::shared_ptr _net_if; - std::shared_ptr _pkt_watcher; + std::shared_ptr _poller; std::shared_ptr _packet_router; @@ -145,6 +145,8 @@ namespace llarp::handlers Router& router() { return _router; } + void start_poller(); + // protected: struct WritePacket { diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index d17585e9dd..fd21b9904d 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -1963,6 +1963,7 @@ namespace llarp HopID pivot_txid; bool use_tun; std::optional maybe_auth = std::nullopt; + std::shared_ptr path_ptr; try { @@ -1977,7 +1978,7 @@ namespace llarp return m.respond(InitiateSession::AUTH_DENIED, true); } - auto path_ptr = _router.path_context()->get_path(pivot_txid); + path_ptr = _router.path_context()->get_path(pivot_txid); if (not path_ptr) { @@ -1998,6 +1999,7 @@ namespace llarp log::warning(logcat, "Exception: {}", e.what()); } + _router.path_context()->drop_path(path_ptr); m.respond(messages::ERROR_RESPONSE, true); } diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index d664375fb7..ccae861410 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -184,7 +184,7 @@ namespace llarp::path upstream_rid(), "path_control", std::move(outer_payload), - [response_cb = std::move(func), weak = weak_from_this()](oxen::quic::message m) { + [response_cb = std::move(func), weak = weak_from_this()](oxen::quic::message m) mutable { auto self = weak.lock(); // TODO: do we want to allow empty callback here? if ((not self) or (not response_cb)) @@ -301,7 +301,7 @@ namespace llarp::path std::string Path::to_string() const { return "RID:{} -- TX:{}/RX:{}"_format( - _router.local_rid(), upstream_txid().to_view(), upstream_rxid().to_view()); + _router.local_rid().ShortString(), upstream_txid().to_string(), upstream_rxid().to_string()); } std::string Path::HopsString() const diff --git a/llarp/path/path_handler.cpp b/llarp/path/path_handler.cpp index 4ae9576bf3..b185f5f59e 100644 --- a/llarp/path/path_handler.cpp +++ b/llarp/path/path_handler.cpp @@ -517,7 +517,7 @@ namespace llarp::path if (auto [it, b] = _paths.try_emplace(terminus, nullptr); not b) { - log::error(logcat, "Pending build to {} already underway... aborting...", terminus); + log::warning(logcat, "Pending build to {} already underway... aborting...", terminus); return false; } } diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index dd3f50d350..fbf15dac5a 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -180,6 +180,10 @@ namespace llarp else log::debug(logcat, "Main event loop ticker already auto-started!"); + // TESTNET: + if (_using_tun) + _tun->start_poller(); + if (not _systemd_ticker->is_running()) { if (not _systemd_ticker->start()) @@ -290,7 +294,6 @@ namespace llarp else { log::debug(logcat, "Client holding identity key generated by key manager..."); - // _identity = _key_manager->identity_key; } return true; @@ -676,7 +679,7 @@ namespace llarp // Full clients have TUN // Embedded clients have nothing // All relays have TUN - if (_should_init_tun = conf.network.init_tun; _should_init_tun) + if (_using_tun = conf.network.init_tun; _using_tun) { log::critical(logcat, "Initializing virtual TUN device..."); init_tun(); diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index ddbf804349..8b999926a6 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -109,7 +109,7 @@ namespace llarp bool _testing_disabled{false}; bool _testnet{false}; bool _bootstrap_seed{false}; - bool _should_init_tun{false}; + bool _using_tun{false}; consensus::reachability_testing router_testing; @@ -203,7 +203,7 @@ namespace llarp bool fully_meshed() const; - bool using_tun_if() const { return _should_init_tun; } + bool using_tun_if() const { return _using_tun; } bool testnet() const { return _testnet; } diff --git a/llarp/vpn/linux.hpp b/llarp/vpn/linux.hpp index 6d9b38c985..55c3ab3fc8 100644 --- a/llarp/vpn/linux.hpp +++ b/llarp/vpn/linux.hpp @@ -45,6 +45,9 @@ namespace llarp::vpn if (_fd == -1) throw std::runtime_error("cannot open /dev/net/tun " + std::string{strerror(errno)}); + if (fcntl(_fd, F_SETFL, O_NONBLOCK) == -1) + throw std::runtime_error{"Failed to set Linux interface fd non-block!s"}; + ifreq ifr{}; in6_ifreq ifr6{}; @@ -213,9 +216,8 @@ namespace llarp::vpn { family = AF_INET6; bitlen = 128; - - auto bigly = oxenc::host_to_big(v6.hi); - inet_ntop(AF_INET6, &bigly, reinterpret_cast(data), sizeof(struct in6_addr)); + auto in6 = v6.to_in6(); + std::memcpy(&data, &in6, sizeof(in6_addr)); } _inet_addr(net::ipv4addr_t addr, size_t bits = 32) diff --git a/llarp/vpn/packet_router.cpp b/llarp/vpn/packet_router.cpp index 1632907e3b..817c0a4967 100644 --- a/llarp/vpn/packet_router.cpp +++ b/llarp/vpn/packet_router.cpp @@ -50,15 +50,15 @@ namespace llarp::vpn void PacketRouter::handle_ip_packet(IPPacket pkt) { auto dest_port = pkt.dest_port(); + if (not dest_port) return _handler(std::move(pkt)); - {} - // const auto proto = pkt.Header()->protocol; - // if (const auto itr = _ip_proto_handler.find(proto); itr != _ip_proto_handler.end()) - // itr->second->HandleIPPacket(std::move(pkt)); - // else - // _handler(std::move(pkt)); + auto proto = pkt.protocol(); + if (const auto itr = _ip_proto_handler.find(*proto); itr != _ip_proto_handler.end()) + itr->second->handle_ip_packet(std::move(pkt)); + else + _handler(std::move(pkt)); } void PacketRouter::add_udp_handler(uint16_t localport, ip_pkt_hook func) diff --git a/llarp/vpn/platform.hpp b/llarp/vpn/platform.hpp index 2ca8056c2c..4b79670767 100644 --- a/llarp/vpn/platform.hpp +++ b/llarp/vpn/platform.hpp @@ -108,7 +108,7 @@ namespace llarp::vpn virtual ~Platform() = default; /// create and start a network interface - std::shared_ptr CreateInterface(InterfaceInfo info, Router* router) + std::shared_ptr create_interface(InterfaceInfo info, Router* router) { if (auto netif = obtain_interface(std::move(info), router)) {