Skip to content

Commit

Permalink
Better linux fd poller
Browse files Browse the repository at this point in the history
  • Loading branch information
dr7ana committed Sep 25, 2024
1 parent 8495517 commit 3103194
Show file tree
Hide file tree
Showing 19 changed files with 145 additions and 100 deletions.
5 changes: 5 additions & 0 deletions llarp/address/ip_packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,9 @@ namespace llarp
return {reinterpret_cast<const char*>(data()), size()};
}

std::string IPPacket::info_line() const
{
return "IPPacket (src:{}, dest:{}, size:{})"_format(_src_addr, _dst_addr, size());
}

} // namespace llarp
2 changes: 2 additions & 0 deletions llarp/address/ip_packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ namespace llarp
ustring_view uview() const { return {data(), size()}; }

std::string to_string();

std::string info_line() const;
};

} // namespace llarp
2 changes: 1 addition & 1 deletion llarp/config/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ namespace llarp
if (maybe and maybe->is_loopback())
throw std::invalid_argument{"{} is a loopback address"_format(arg)};

log::critical(logcat, "parsed address: ", *maybe);
log::critical(logcat, "parsed address: {}", *maybe);

return maybe;
};
Expand Down
1 change: 0 additions & 1 deletion llarp/crypto/key_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ namespace llarp

bool KeyManager::_initialize(const Config& config, bool is_relay)
{
logcat->set_level(log::Level::trace); // TESTNET:
if (is_initialized)
return false;

Expand Down
10 changes: 6 additions & 4 deletions llarp/ev/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ namespace llarp
log::info(logcat, "lokinet loop shut down {}", _close_immediately ? "immediately" : "gracefully");
}

bool EventLoop::add_network_interface(std::shared_ptr<vpn::NetworkInterface> netif, ip_pkt_hook handler)
std::shared_ptr<FDPoller> EventLoop::add_network_interface(
std::shared_ptr<vpn::NetworkInterface> netif, std::function<void()> hook)
{
(void)netif;
(void)handler;
(void)hook;
std::shared_ptr<FDPoller> _p;

#ifdef __linux__
//
_p = _loop->template make_shared<LinuxPoller>(netif->PollFD(), _loop->loop(), std::move(hook));
#else
//
#endif
return true;
return _p;
}

void EventLoop::stop(bool)
Expand Down
3 changes: 2 additions & 1 deletion llarp/ev/loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ namespace llarp

bool in_event_loop() const { return _loop->in_event_loop(); }

bool add_network_interface(std::shared_ptr<vpn::NetworkInterface> netif, ip_pkt_hook handler);
std::shared_ptr<FDPoller> add_network_interface(
std::shared_ptr<vpn::NetworkInterface> netif, std::function<void()> hook);

template <typename Callable>
void call(Callable&& f)
Expand Down
34 changes: 21 additions & 13 deletions llarp/ev/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,17 @@ namespace llarp
log::trace(logcat, "Cooldown {}successfully began after {} attempts!", _is_cooling_down ? "" : "un", _current);
}

std::shared_ptr<EventPoller> EventPoller::make(const std::shared_ptr<EventLoop>& _loop, std::function<void()> task)
LinuxPoller::LinuxPoller(int _fd, const loop_ptr& _loop, std::function<void()> task)
: FDPoller{_fd, std::move(task)}
{
return _loop->template make_shared<EventPoller>(_loop->loop(), std::move(task));
}

EventPoller::EventPoller(const loop_ptr& _loop, std::function<void()> task) : f{std::move(task)}
{
pv.reset(evwatch_prepare_new(
ev.reset(event_new(
_loop.get(),
[](struct evwatch*, const struct evwatch_prepare_cb_info*, void* w) {
fd,
EV_READ | EV_PERSIST,
[](evutil_socket_t, short, void* s) {
try
{
auto* self = reinterpret_cast<EventPoller*>(w);
auto* self = reinterpret_cast<LinuxPoller*>(s);
assert(self);

if (not self->f)
Expand All @@ -194,15 +192,25 @@ namespace llarp
}
catch (const std::exception& e)
{
log::critical(logcat, "EventPoller caugh exception: {}", e.what());
log::critical(logcat, "EventPoller caught exception: {}", e.what());
}
},
this));

log::debug(logcat, "Linux poller configured to watch FD: {}", fd);
}

EventPoller::~EventPoller()
bool LinuxPoller::start()
{
pv.reset();
f = nullptr;
auto rv = event_add(ev.get(), nullptr) == 0;
log::info(logcat, "Linux poller {} watching FD: {}", rv ? "successfully began" : "failed to start", fd);
return rv;
}

bool LinuxPoller::stop()
{
auto rv = event_del(ev.get());
log::info(logcat, "Linux poller {} watching FD: {}", rv ? "successfully stopped" : "failed to stop", fd);
return rv;
}
} // namespace llarp
65 changes: 39 additions & 26 deletions llarp/ev/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ namespace llarp
// shared_ptr containing the actual libev loop
using loop_ptr = std::shared_ptr<::event_base>;

inline constexpr auto watch_deleter = [](::evwatch* w) {
if (w)
::evwatch_free(w);
};

using watch_ptr = std::unique_ptr<::evwatch, decltype(watch_deleter)>;

/** EventTrigger
This class is a parallel implementation of libquic Ticker (typedef'ed as 'EventTicker' above). Rather than
invoking at regular intervals, it is manually invoked with an optional time delay. This is a useful
Expand Down Expand Up @@ -106,35 +99,55 @@ namespace llarp
bool is_cooling_down() const { return _is_cooling_down; }
};

/** EventPoller
This class is a similar implementation to EventTrigger and Ticker, with a few key differences in relation to
the net interfaces it manages. First, we don't want the execution of the logic to be tied to a specific timer or
fixed interval. Rather, this will be event triggered on packet I/O. As a result, this necessitates the second
key difference: it uses a libevent "prepare" watcher to fire immediately BEFORE polling for I/O. Libevent also
exposes the concept of a "check" watcher, which fires immediately AFTER processing active events.
/** FDPoller
This class is the base for the platform-specific Pollers that watch for IO on the virtual TUN network
interface.
*/
struct EventPoller
struct FDPoller
{
friend class oxen::quic::Loop;

private:
EventPoller(const loop_ptr& _loop, std::function<void()> task);
// No move/copy/etc
FDPoller() = delete;
FDPoller(const FDPoller&) = delete;
FDPoller(FDPoller&&) = delete;
FDPoller& operator=(const FDPoller&) = delete;
FDPoller& operator=(FDPoller&&) = delete;

virtual ~FDPoller()
{
ev.reset();
f = nullptr;
}

protected:
FDPoller(int _fd, std::function<void()> task) : fd{_fd}, f{std::move(task)} {}

int fd;
event_ptr ev;
std::function<void()> f;

public:
static std::shared_ptr<EventPoller> make(const std::shared_ptr<EventLoop>& _loop, std::function<void()> task);
virtual bool start() = 0;

// No move/copy/etc
EventPoller() = delete;
EventPoller(const EventTrigger&) = delete;
EventPoller(EventPoller&&) = delete;
EventPoller& operator=(const EventPoller&) = delete;
EventPoller& operator=(EventPoller&&) = delete;
virtual bool stop() = 0;
};

~EventPoller();
/** LinuxPoller
This class is a linux-specific extension of the Base poller type.
*/
struct LinuxPoller final : public FDPoller
{
friend class EventLoop;
friend class oxen::quic::Loop;

private:
watch_ptr pv;
std::function<void()> f;
LinuxPoller(int _fd, const loop_ptr& _loop, std::function<void()> task);

public:
bool start() override;

bool stop() override;
};

} // namespace llarp
11 changes: 7 additions & 4 deletions llarp/handlers/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ namespace llarp::handlers
else
{
// TODO: if this fails, we should close the session
log::warning(logcat, "TUN devcice failed to route session (remote: {}) to local ip", session->remote());
ret = false;
}
}
else
Expand Down Expand Up @@ -445,7 +447,7 @@ namespace llarp::handlers
path->pivot_txid(),
fetch_auth_token(remote),
_router.using_tun_if()),
[this, remote, tag, path, hook = std::move(cb), is_exit](std::string response) {
[this, remote, tag, path, hook = std::move(cb), is_exit](std::string response) mutable {
if (response == messages::OK_RESPONSE)
{
auto outbound = std::make_shared<session::OutboundSession>(
Expand Down Expand Up @@ -525,7 +527,7 @@ namespace llarp::handlers
if (not build3(
path->upstream_rid(),
std::move(payload),
[this, path, intros, remote, hook = std::move(cb), is_exit](oxen::quic::message m) {
[this, path, intros, remote, hook = std::move(cb), is_exit](oxen::quic::message m) mutable {
if (m)
{
// Do not call ::add_path() or ::path_build_succeeded() here; OutboundSession constructor will
Expand Down Expand Up @@ -570,12 +572,13 @@ namespace llarp::handlers

auto counter = std::make_shared<size_t>(path::DEFAULT_PATHS_HELD);

_router.loop()->call([this, remote, handler = std::move(cb), is_exit, counter]() {
_router.loop()->call([this, remote, handler = std::move(cb), is_exit, counter]() mutable {
lookup_intro(
remote.router_id(),
false,
0,
[this, remote, hook = std::move(handler), is_exit, counter](std::optional<service::IntroSet> intro) {
[this, remote, hook = std::move(handler), is_exit, counter](
std::optional<service::IntroSet> intro) mutable {
// already have a successful return
if (*counter == 0)
return;
Expand Down
65 changes: 34 additions & 31 deletions llarp/handlers/tun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ namespace llarp::handlers
}
}

log::debug(logcat, "Tun constructing IPRange iterator on local range: {}", _local_range);
_local_range_iterator = IPRangeIterator(_local_range);

_local_netaddr = NetworkAddress::from_pubkey(_router.local_rid(), not _router.is_service_node());
_local_ip_mapping.insert_or_assign(_local_base_ip, std::move(_local_netaddr));

Expand All @@ -356,34 +359,6 @@ namespace llarp::handlers

log::debug(logcat, "{} setting up network...", name());

_net_if = router().vpn_platform()->CreateInterface(std::move(info), &_router);

_if_name = _net_if->interface_info().ifname;
log::info(logcat, "{} got network interface:{}", name(), _if_name);

auto if_hook = [netif = _net_if, pktrouter = _packet_router]() mutable {
auto pkt = netif->read_next_packet();

while (not pkt.empty())
{
pktrouter->handle_ip_packet(std::move(pkt));
pkt = netif->read_next_packet();
}
};

auto handle_packet = [netif = _net_if, pktrouter = _packet_router](IPPacket pkt) {
// TODO: packets used to have reply hooks
// pkt.reply = [netif](auto pkt) { netif->write_packet(std::move(pkt)); };
pktrouter->handle_ip_packet(std::move(pkt));
};

if (not router().loop()->add_network_interface(_net_if, std::move(handle_packet)))
{
auto err = "{} failed to add network interface!"_format(name());
log::error(logcat, "{}", err);
throw std::runtime_error{std::move(err)};
}

_local_ipv6 = ipv6_enabled ? _local_addr : _local_addr.mapped_ipv4_as_ipv6();

if (ipv6_enabled)
Expand All @@ -400,6 +375,26 @@ namespace llarp::handlers
log::info(
logcat, "{} has interface ipv4 address ({}) with ipv6 address ({})", name(), _local_addr, _local_ipv6);

_net_if = router().vpn_platform()->create_interface(std::move(info), &_router);
_if_name = _net_if->interface_info().ifname;

log::info(logcat, "{} got network interface:{}", name(), _if_name);

auto pkt_hook = [this]() {
for (auto pkt = _net_if->read_next_packet(); not pkt.empty(); pkt = _net_if->read_next_packet())
{
log::debug(logcat, "packet router receiving {}", pkt.info_line());
_packet_router->handle_ip_packet(std::move(pkt));
}
};

if (_poller = router().loop()->add_network_interface(_net_if, std::move(pkt_hook)); not _poller)
{
auto err = "{} failed to add network interface!"_format(name());
log::error(logcat, "{}", err);
throw std::runtime_error{std::move(err)};
}

// if (auto* quic = GetQUICTunnel())
// {
// TODO:
Expand Down Expand Up @@ -1015,11 +1010,12 @@ namespace llarp::handlers
{
log::debug(logcat, "Dispatching outbound packet for session (remote: {})", remote);
session->send_path_data_message(std::move(pkt).steal_payload());
return;
}
throw std::runtime_error{"Could not find session (remote: {}) for outbound packet!"_format(remote)};
else
log::warning(logcat, "Could not find session (remote: {}) for outbound packet!", remote);
}
throw std::runtime_error{"Could not find remote mapped to local ip: {}"_format(dest)};
else
log::debug(logcat, "Could not find remote for route {}", pkt.info_line());
}

bool TunEndpoint::obtain_src_for_remote(const NetworkAddress& remote, ip_v& src, bool use_ipv4)
Expand Down Expand Up @@ -1145,6 +1141,13 @@ namespace llarp::handlers
return true;
}

void TunEndpoint::start_poller()
{
if (not _poller->start())
throw std::runtime_error{"TUN failed to start FD poller!"};
log::debug(logcat, "TUN successfully started FD poller!");
}

bool TunEndpoint::is_allowing_traffic(const IPPacket& pkt) const
{
if (auto exitPolicy = get_traffic_policy())
Expand Down
4 changes: 3 additions & 1 deletion llarp/handlers/tun.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace llarp::handlers
std::optional<IPRange> _base_ipv6_range = std::nullopt;

std::shared_ptr<vpn::NetworkInterface> _net_if;
std::shared_ptr<EventPoller> _pkt_watcher;
std::shared_ptr<FDPoller> _poller;

std::shared_ptr<vpn::PacketRouter> _packet_router;

Expand Down Expand Up @@ -145,6 +145,8 @@ namespace llarp::handlers

Router& router() { return _router; }

void start_poller();

// protected:
struct WritePacket
{
Expand Down
Loading

0 comments on commit 3103194

Please sign in to comment.