diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f317d63592..7520d827dd 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -20,28 +20,44 @@ stages: # - "lokinet" # we'll just try our travis set up for now -build:linux: - image: ubuntu:latest +build:linux_release: + image: ubuntu:xenial tags: - linux stage: build before_script: - - apt-get update && apt-get install -y build-essential cmake git libcap-dev bsdmainutils ninja-build curl git ca-certificates ccache libuv1-dev libsodium-dev + - apt-get update && apt-get install -y binutils-gold build-essential bsdmainutils ca-certificates ccache cmake curl git libcap-dev libcurl4-openssl-dev libsodium-dev libuv1-dev ninja-build script: - - make STATIC_LINK=OFF + - DOWNLOAD_SODIUM=ON BUILD_TYPE=Release STATIC_LINK=OFF NINJA=ninja make artifacts: paths: - - "lokinet" + - "build/daemon/lokinet" + - "build/daemon/lokinetctl" -build:freebsd: +build:linux_debug: + image: ubuntu:xenial tags: - - freebsd + - linux stage: build + before_script: + - apt-get update && apt-get install -y binutils-gold build-essential bsdmainutils ca-certificates ccache cmake curl git libcap-dev libcurl4-openssl-dev libsodium-dev libuv1-dev ninja-build script: - - gmake + - DOWNLOAD_SODIUM=ON BUILD_TYPE=Debug IS_NOTIFICATION=1 STATIC_LINK=OFF NINJA=ninja make artifacts: paths: - "lokinet" + - "lokinetctl" + +# needs libsodium (probably libuv and libcurl too) +#build:freebsd: +# tags: +# - freebsd +# stage: build +# script: +# - gmake +# artifacts: +# paths: +# - "lokinet" #build:windows: # tags: diff --git a/.travis.yml b/.travis.yml index d6ec2d4efa..cf474983cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -181,7 +181,7 @@ script: elif [[ ! -z $DOCKER_FILE ]]; then travis_wait docker build -f $DOCKER_FILE .; else - travis_wait make ${MAKE_TARGET:-test}; + travis_wait make DOWNLOAD_SODIUM=ON ${MAKE_TARGET:-test}; fi after_script: diff --git a/include/llarp.h b/include/llarp.h index 6d040f6d28..a50300cee8 100644 --- a/include/llarp.h +++ b/include/llarp.h @@ -3,15 +3,6 @@ #include #include -#if defined(_WIN32) -#ifdef _MSC_VER -#include -typedef SSIZE_T ssize_t; -#else -#define ssize_t long -#endif -#endif - #ifdef __cplusplus extern "C" { diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index bb9f2632db..9d4c9e30ea 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -53,7 +53,6 @@ set(LIB_UTIL_SRC util/thread/thread_pool.cpp util/thread/threading.cpp util/thread/threadpool.cpp - util/thread/timer.cpp util/thread/timerqueue.cpp util/time.cpp util/types.cpp diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 87589c7a58..be5634f60c 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -625,9 +625,9 @@ llarp_generic_ensure_config(std::ofstream &f, std::string basepath, f << "\n\n"; - f << "# admin api (disabled by default)\n"; + f << "# admin api\n"; f << "[api]\n"; - f << "enabled=false\n"; + f << "enabled=true\n"; f << "#authkey=insertpubkey1here\n"; f << "#authkey=insertpubkey2here\n"; f << "#authkey=insertpubkey3here\n"; diff --git a/llarp/context.cpp b/llarp/context.cpp index 4f57821f09..86a6ba5d08 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -202,6 +202,7 @@ __ ___ ____ _ _ ___ _ _ ____ llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO); llarp::LogInfo("starting up"); mainloop = llarp_make_ev_loop(); + logic->set_event_loop(mainloop.get()); mainloop->set_logic(logic); diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 1f95b01209..6565f6f385 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -153,11 +153,11 @@ namespace llarp GetIntroSetByServiceAddress( const llarp::service::Address& addr) const override; - static void - handle_cleaner_timer(void* user, uint64_t orig, uint64_t left); + void + handle_cleaner_timer(uint64_t interval); - static void - handle_explore_timer(void* user, uint64_t orig, uint64_t left); + void + handle_explore_timer(uint64_t interval); /// explore dht for new routers void @@ -338,34 +338,28 @@ namespace llarp } void - Context::handle_explore_timer(void* u, uint64_t orig, uint64_t left) + Context::handle_explore_timer(uint64_t interval) { - if(left) - return; - auto* ctx = static_cast< Context* >(u); - const auto num = - std::min(ctx->router->NumberOfConnectedRouters(), size_t(4)); + const auto num = std::min(router->NumberOfConnectedRouters(), size_t(4)); if(num) - ctx->Explore(num); - ctx->router->logic()->call_later({orig, ctx, &handle_explore_timer}); + Explore(num); + router->logic()->call_later( + interval, + std::bind(&llarp::dht::Context::handle_explore_timer, this, + interval)); } void - Context::handle_cleaner_timer(void* u, - __attribute__((unused)) uint64_t orig, - uint64_t left) + Context::handle_cleaner_timer(__attribute__((unused)) uint64_t interval) { - if(left) - return; - auto* ctx = static_cast< Context* >(u); // clean up transactions - ctx->CleanupTX(); + CleanupTX(); - if(ctx->_services) + if(_services) { // expire intro sets - auto now = ctx->Now(); - auto& nodes = ctx->_services->nodes; + auto now = Now(); + auto& nodes = _services->nodes; auto itr = nodes.begin(); while(itr != nodes.end()) { @@ -378,7 +372,7 @@ namespace llarp ++itr; } } - ctx->ScheduleCleanupTimer(); + ScheduleCleanupTimer(); } std::set< service::IntroSet > @@ -528,7 +522,9 @@ namespace llarp // start exploring r->logic()->call_later( - {exploreInterval, this, &llarp::dht::Context::handle_explore_timer}); + exploreInterval, + std::bind(&llarp::dht::Context::handle_explore_timer, this, + exploreInterval)); // start cleanup timer ScheduleCleanupTimer(); } @@ -536,7 +532,9 @@ namespace llarp void Context::ScheduleCleanupTimer() { - router->logic()->call_later({1000, this, &handle_cleaner_timer}); + router->logic()->call_later( + 1000, + std::bind(&llarp::dht::Context::handle_cleaner_timer, this, 1000)); } void diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 17872533e1..4eb65ae8cd 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -31,13 +31,9 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, { ev->update_time(); ev->tick(EV_TICK_INTERVAL); - if(ev->running()) - { - ev->update_time(); - logic->tick(ev->time_now()); - } llarp::LogContext::Instance().logStream->Tick(ev->time_now()); } + logic->clear_event_loop(); ev->stopped(); } diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h index 22dc5443b7..e40d5cdf9d 100644 --- a/llarp/ev/ev.h +++ b/llarp/ev/ev.h @@ -9,16 +9,6 @@ #include #include #include - -#if defined(_WIN32) -#ifdef _MSC_VER -#include -typedef SSIZE_T ssize_t; -#else -#define ssize_t long -#endif -#endif - #else #include #include diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 7e040104e6..b10400ebd0 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -741,6 +741,13 @@ struct llarp_ev_loop virtual int tick(int ms) = 0; + virtual uint32_t + call_after_delay(llarp_time_t delay_ms, + std::function< void(void) > callback) = 0; + + virtual void + cancel_delayed_call(uint32_t call_id) = 0; + virtual bool add_ticker(std::function< void(void) > ticker) = 0; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index f172e0063a..d8c86048be 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -793,6 +793,23 @@ namespace libuv } }; #endif + + static void + OnAsyncWake(uv_async_t* async_handle) + { + Loop* loop = static_cast< Loop* >(async_handle->data); + loop->process_timer_queue(); + loop->process_cancel_queue(); + } + + Loop::Loop() + : llarp_ev_loop() + , m_LogicCalls(1024) + , m_timerQueue(20) + , m_timerCancelQueue(20) + { + } + bool Loop::init() { @@ -809,7 +826,6 @@ namespace libuv #else uv_loop_configure(&m_Impl, UV_LOOP_BLOCK_SIGNAL, SIGPIPE); #endif - m_TickTimer.data = this; m_LogicCaller.data = this; uv_async_init(&m_Impl, &m_LogicCaller, [](uv_async_t* h) { Loop* l = static_cast< Loop* >(h->data); @@ -819,8 +835,14 @@ namespace libuv f(); } }); + m_TickTimer = new uv_timer_t; + m_TickTimer->data = this; m_Run.store(true); - return uv_timer_init(&m_Impl, &m_TickTimer) != -1; + m_nextID.store(0); + + m_WakeUp.data = this; + uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake); + return uv_timer_init(&m_Impl, m_TickTimer) != -1; } void @@ -859,12 +881,107 @@ namespace libuv { if(m_Run) { - uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0); + uv_timer_start(m_TickTimer, &OnTickTimeout, ms, 0); uv_run(&m_Impl, UV_RUN_ONCE); } return 0; } + struct TimerData + { + Loop* loop; + uint64_t job_id; + }; + + void + CloseUVTimer(uv_timer_t* timer) + { + // have to delete timer handle this way because libuv. + uv_timer_stop(timer); + uv_close((uv_handle_t*)timer, + [](uv_handle_t* handle) { delete(uv_timer_t*)handle; }); + } + + static void + OnUVTimer(uv_timer_t* timer) + { + TimerData* timer_data = static_cast< TimerData* >(timer->data); + Loop* loop = timer_data->loop; + loop->do_timer_job(timer_data->job_id); + + delete timer_data; + CloseUVTimer(timer); + } + + uint32_t + Loop::call_after_delay(llarp_time_t delay_ms, + std::function< void(void) > callback) + { + PendingTimer timer; + timer.delay_ms = delay_ms; + timer.callback = callback; + timer.job_id = m_nextID++; + uint64_t job_id = timer.job_id; + + m_timerQueue.pushBack(std::move(timer)); + uv_async_send(&m_WakeUp); + + return job_id; + } + + void + Loop::cancel_delayed_call(uint32_t job_id) + { + m_timerCancelQueue.pushBack(job_id); + uv_async_send(&m_WakeUp); + } + + void + Loop::process_timer_queue() + { + while(not m_timerQueue.empty()) + { + PendingTimer job = m_timerQueue.popFront(); + uint64_t job_id = job.job_id; + m_pendingCalls.emplace(job_id, std::move(job.callback)); + + TimerData* timer_data = new TimerData; + timer_data->loop = this; + timer_data->job_id = job_id; + + uv_timer_t* newTimer = new uv_timer_t; + newTimer->data = (void*)timer_data; + + uv_timer_init(&m_Impl, newTimer); + uv_timer_start(newTimer, &OnUVTimer, job.delay_ms, 0); + } + } + + void + Loop::process_cancel_queue() + { + while(not m_timerCancelQueue.empty()) + { + uint64_t job_id = m_timerCancelQueue.popFront(); + auto itr = m_pendingCalls.find(job_id); + if(itr != m_pendingCalls.end()) + { + m_pendingCalls.erase(itr); + } + } + } + + void + Loop::do_timer_job(uint64_t job_id) + { + auto itr = m_pendingCalls.find(job_id); + if(itr != m_pendingCalls.end()) + { + LogicCall(m_Logic, itr->second); + m_pendingCalls.erase(itr); + } + } + void Loop::stop() { @@ -886,18 +1003,18 @@ namespace libuv [](uv_handle_t* h, void*) { if(uv_is_closing(h)) return; - if(h->data && uv_is_active(h)) + if(h->data && uv_is_active(h) && h->type != UV_TIMER) { static_cast< glue* >(h->data)->Close(); } + else if(h->type == UV_TIMER) + { + CloseUVTimer((uv_timer_t*)h); + } }, nullptr); } - Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024) - { - } - void Loop::stopped() { diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index 6640edb93d..db8b1520f2 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -9,10 +9,21 @@ #include #include +#include + namespace libuv { struct Loop final : public llarp_ev_loop { + typedef std::function< void(void) > Callback; + + struct PendingTimer + { + uint64_t job_id; + llarp_time_t delay_ms; + Callback callback; + }; + Loop(); bool @@ -37,6 +48,22 @@ namespace libuv int tick(int ms) override; + uint32_t + call_after_delay(llarp_time_t delay_ms, + std::function< void(void) > callback) override; + + void + cancel_delayed_call(uint32_t job_id) override; + + void + process_timer_queue(); + + void + process_cancel_queue(); + + void + do_timer_job(uint64_t job_id); + void stop() override; @@ -104,7 +131,8 @@ namespace libuv private: uv_loop_t m_Impl; - uv_timer_t m_TickTimer; + uv_timer_t* m_TickTimer; + uv_async_t m_WakeUp; std::atomic< bool > m_Run; uv_async_t m_LogicCaller; using AtomicQueue_t = llarp::thread::Queue< std::function< void(void) > >; @@ -114,6 +142,12 @@ namespace libuv uint64_t last_time; uint64_t loop_run_count; #endif + std::atomic< uint32_t > m_nextID; + + std::map< uint32_t, Callback > m_pendingCalls; + + llarp::thread::Queue< PendingTimer > m_timerQueue; + llarp::thread::Queue< uint32_t > m_timerCancelQueue; }; } // namespace libuv diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 8f5311e0c8..78be948730 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -7,6 +7,8 @@ #include #include +constexpr uint64_t LINK_LAYER_TICK_INTERVAL = 100; + namespace llarp { static constexpr size_t MaxSessionsPerKey = 16; @@ -289,7 +291,7 @@ namespace llarp { m_Worker = worker; m_Logic = l; - ScheduleTick(100); + ScheduleTick(LINK_LAYER_TICK_INTERVAL); return true; } @@ -450,17 +452,18 @@ namespace llarp } void - ILinkLayer::OnTick(uint64_t interval) + ILinkLayer::OnTick() { auto now = Now(); Tick(now); - ScheduleTick(interval); + ScheduleTick(LINK_LAYER_TICK_INTERVAL); } void ILinkLayer::ScheduleTick(uint64_t interval) { - tick_id = m_Logic->call_later({interval, this, &ILinkLayer::on_timer_tick}); + tick_id = + m_Logic->call_later(interval, std::bind(&ILinkLayer::OnTick, this)); } void diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index f8bd0ab870..bb9aea0395 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -207,17 +207,8 @@ namespace llarp } private: - static void - on_timer_tick(void* user, uint64_t orig, uint64_t left) - { - // timer cancelled - if(left) - return; - static_cast< ILinkLayer* >(user)->OnTick(orig); - } - void - OnTick(uint64_t interval); + OnTick(); void ScheduleTick(uint64_t interval); diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index 7540f9be5e..acc63495fd 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -12,11 +12,6 @@ #include #include -#ifdef _MSC_VER -#include -typedef SSIZE_T ssize_t; -#endif - /** * nodedb.hpp * diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 18c8ca3da9..e7d0a7246d 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -32,9 +32,8 @@ #if defined(ANDROID) || defined(IOS) #include #endif -#if defined(WITH_SYSTEMD) -#include -#endif + +constexpr uint64_t ROUTER_TICK_INTERVAL_MS = 1000; namespace llarp { @@ -284,14 +283,11 @@ namespace llarp } void - Router::handle_router_ticker(void *user, uint64_t orig, uint64_t left) + Router::handle_router_ticker() { - if(left) - return; - auto *self = static_cast< Router * >(user); - self->ticker_job_id = 0; - self->Tick(); - self->ScheduleTicker(orig); + ticker_job_id = 0; + LogicCall(logic(), std::bind(&Router::Tick, this)); + ScheduleTicker(ROUTER_TICK_INTERVAL_MS); } bool @@ -765,7 +761,8 @@ namespace llarp void Router::ScheduleTicker(uint64_t ms) { - ticker_job_id = _logic->call_later({ms, this, &handle_router_ticker}); + ticker_job_id = + _logic->call_later(ms, std::bind(&Router::handle_router_ticker, this)); } void @@ -1036,7 +1033,7 @@ namespace llarp _netloop->add_ticker(std::bind(&Router::PumpLL, this)); - ScheduleTicker(1000); + ScheduleTicker(ROUTER_TICK_INTERVAL_MS); _running.store(true); _startedAt = Now(); #if defined(WITH_SYSTEMD) @@ -1061,20 +1058,18 @@ namespace llarp return 0; } - static void - RouterAfterStopLinks(void *u, uint64_t, uint64_t) + void + Router::AfterStopLinks() { - auto *self = static_cast< Router * >(u); - self->Close(); + Close(); } - static void - RouterAfterStopIssued(void *u, uint64_t, uint64_t) + void + Router::AfterStopIssued() { - auto *self = static_cast< Router * >(u); - self->StopLinks(); - self->nodedb()->AsyncFlushToDisk(); - self->_logic->call_later({200, self, &RouterAfterStopLinks}); + StopLinks(); + nodedb()->AsyncFlushToDisk(); + _logic->call_later(200, std::bind(&Router::AfterStopLinks, this)); } void @@ -1100,7 +1095,7 @@ namespace llarp rpcServer->Stop(); paths.PumpUpstream(); _linkManager.PumpLinks(); - _logic->call_later({200, this, &RouterAfterStopIssued}); + _logic->call_later(200, std::bind(&Router::AfterStopIssued, this)); } bool diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 71bdf27951..8ac4df0a98 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -466,8 +466,14 @@ namespace llarp bool HasSessionTo(const RouterID &remote) const override; - static void - handle_router_ticker(void *user, uint64_t orig, uint64_t left); + void + handle_router_ticker(); + + void + AfterStopLinks(); + + void + AfterStopIssued(); private: std::atomic< bool > _stopping; diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index 95b0b123b7..db0e2130ae 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -36,6 +36,10 @@ namespace llarp struct alignas(std::max_align_t) AlignedBuffer #endif { + static_assert(sz >= 8, + "AlignedBuffer cannot be used with buffers smaller than 8 " + "bytes"); + static constexpr size_t SIZE = sz; using Data = std::array< byte_t, SIZE >; @@ -269,7 +273,9 @@ namespace llarp size_t operator()(const AlignedBuffer& buf) const { - return *(reinterpret_cast< const size_t* >(buf.data())); + size_t hash; + std::memcpy(&hash, buf.data(), sizeof(hash)); + return hash; } }; diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp index 291a2c254c..92d1c2934e 100644 --- a/llarp/util/thread/logic.cpp +++ b/llarp/util/thread/logic.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -8,22 +7,8 @@ namespace llarp { - void - Logic::tick(llarp_time_t now) - { - if(m_Queue) - { - llarp_timer_set_time(m_Timer, now); - if(llarp_timer_should_call(m_Timer)) - m_Queue(std::bind(&llarp_timer_tick_all, m_Timer)); - return; - } - llarp_timer_tick_all_async(m_Timer, m_Thread, now); - } - Logic::Logic(size_t sz) : m_Thread(llarp_init_threadpool(1, "llarp-logic", sz)) - , m_Timer(llarp_init_timer()) { llarp_threadpool_start(m_Thread); /// set thread id @@ -41,7 +26,6 @@ namespace llarp Logic::~Logic() { delete m_Thread; - llarp_free_timer(m_Timer); } size_t @@ -61,8 +45,6 @@ namespace llarp Logic::stop() { llarp::LogDebug("logic thread stop"); - // stop all timers from happening in the future - LogicCall(this, std::bind(&llarp_timer_stop, m_Timer)); // stop all operations on threadpool llarp_threadpool_stop(m_Thread); } @@ -139,32 +121,35 @@ namespace llarp m_Queue([self = this]() { self->m_ID = std::this_thread::get_id(); }); } - void - Logic::call_later(llarp_time_t timeout, std::function< void(void) > func) - { - llarp_timer_call_func_later(m_Timer, timeout, func); - } - uint32_t - Logic::call_later(const llarp_timeout_job& job) + Logic::call_later(llarp_time_t timeout, std::function< void(void) > func) { - llarp_timeout_job j; - j.user = job.user; - j.timeout = job.timeout; - j.handler = job.handler; - return llarp_timer_call_later(m_Timer, j); + auto loop = m_Loop; + if(loop != nullptr) + { + return loop->call_after_delay(timeout, func); + } + return 0; } void Logic::cancel_call(uint32_t id) { - llarp_timer_cancel_job(m_Timer, id); + auto loop = m_Loop; + if(loop != nullptr) + { + loop->cancel_delayed_call(id); + } } void Logic::remove_call(uint32_t id) { - llarp_timer_remove_job(m_Timer, id); + auto loop = m_Loop; + if(loop != nullptr) + { + loop->cancel_delayed_call(id); + } } bool @@ -173,4 +158,16 @@ namespace llarp return m_ID.value() == std::this_thread::get_id(); } + void + Logic::set_event_loop(llarp_ev_loop* loop) + { + m_Loop = loop; + } + + void + Logic::clear_event_loop() + { + m_Loop = nullptr; + } + } // namespace llarp diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index 1f16e9bad9..2e29622f01 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -1,9 +1,9 @@ #ifndef LLARP_LOGIC_HPP #define LLARP_LOGIC_HPP +#include #include #include -#include #include namespace llarp @@ -15,10 +15,6 @@ namespace llarp ~Logic(); - /// trigger times as needed - void - tick(llarp_time_t now); - /// stop all operation and wait for that to die void stop(); @@ -31,9 +27,6 @@ namespace llarp int lineo); uint32_t - call_later(const llarp_timeout_job& job); - - void call_later(llarp_time_t later, std::function< void(void) > func); void @@ -51,10 +44,16 @@ namespace llarp void SetQueuer(std::function< void(std::function< void(void) >) > q); + void + set_event_loop(llarp_ev_loop* loop); + + void + clear_event_loop(); + private: using ID_t = std::thread::id; llarp_threadpool* const m_Thread; - llarp_timer_context* const m_Timer; + llarp_ev_loop* m_Loop = nullptr; absl::optional< ID_t > m_ID; util::ContentionKiller m_Killer; std::function< void(std::function< void(void) >) > m_Queue; diff --git a/llarp/util/thread/timer.cpp b/llarp/util/thread/timer.cpp deleted file mode 100644 index c1003fc755..0000000000 --- a/llarp/util/thread/timer.cpp +++ /dev/null @@ -1,327 +0,0 @@ -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace llarp -{ - struct timer - { - void* user; - uint64_t called_at; - uint64_t started; - uint64_t timeout; - llarp_timer_handler_func func; - std::function< void(void) > deferredFunc; - bool done; - bool canceled; - - timer(llarp_time_t now, uint64_t ms = 0, void* _user = nullptr, - llarp_timer_handler_func _func = nullptr) - : user(_user) - , called_at(0) - , started(now) - , timeout(ms) - , func(std::move(_func)) - , done(false) - , canceled(false) - { - } - - ~timer() = default; - - void - exec(); - - static void - call(void* user) - { - static_cast< timer* >(user)->exec(); - } - }; -} // namespace llarp - -struct llarp_timer_context -{ - llarp::util::Mutex timersMutex; // protects timers - std::unordered_map< uint32_t, std::unique_ptr< llarp::timer > > timers - GUARDED_BY(timersMutex); - llarp::util::Mutex tickerMutex; - std::unique_ptr< llarp::util::Condition > ticker; - absl::Duration nextTickLen = absl::Milliseconds(100); - - llarp_time_t m_Now; - llarp_time_t m_NextRequiredTickAt = - std::numeric_limits< llarp_time_t >::max(); - size_t m_NumPendingTimers; - - llarp_timer_context() - { - m_Now = llarp::time_now_ms(); - } - - uint32_t currentId = 0; - bool _run = true; - - ~llarp_timer_context() = default; - - bool - run() - { - return _run; - } - - void - stop() - { - _run = false; - } - - void - cancel(uint32_t id) LOCKS_EXCLUDED(timersMutex) - { - llarp::util::Lock lock(&timersMutex); - const auto& itr = timers.find(id); - if(itr == timers.end()) - return; - itr->second->canceled = true; - } - - void - remove(uint32_t id) LOCKS_EXCLUDED(timersMutex) - { - llarp::util::Lock lock(&timersMutex); - const auto& itr = timers.find(id); - if(itr == timers.end()) - return; - itr->second->func = nullptr; - itr->second->canceled = true; - } - - uint32_t - call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms) - LOCKS_EXCLUDED(timersMutex) - { - llarp::util::Lock lock(&timersMutex); - - const uint32_t id = ++currentId; - timers.emplace( - id, std::make_unique< llarp::timer >(m_Now, timeout_ms, user, func)); - m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms); - m_NumPendingTimers = timers.size(); - return id; - } - - uint32_t - call_func_later(std::function< void(void) > func, llarp_time_t timeout_ms) - { - llarp::util::Lock lock(&timersMutex); - - const uint32_t id = ++currentId; - timers.emplace( - id, - std::make_unique< llarp::timer >(m_Now, timeout_ms, nullptr, nullptr)); - timers[id]->deferredFunc = func; - m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms); - m_NumPendingTimers = timers.size(); - return id; - } - - void - cancel_all() LOCKS_EXCLUDED(timersMutex) - { - { - llarp::util::Lock lock(&timersMutex); - - for(auto& item : timers) - { - item.second->func = nullptr; - item.second->canceled = true; - } - } - } - - bool - ShouldTriggerTimers(llarp_time_t peekAhead) const - { - return m_NumPendingTimers > 0 - and (m_Now + peekAhead) >= m_NextRequiredTickAt; - } -}; - -struct llarp_timer_context* -llarp_init_timer() -{ - return new llarp_timer_context(); -} - -uint32_t -llarp_timer_call_later(struct llarp_timer_context* t, - struct llarp_timeout_job job) -{ - return t->call_later(job.user, job.handler, job.timeout); -} - -uint32_t -llarp_timer_call_func_later(struct llarp_timer_context* t, llarp_time_t timeout, - std::function< void(void) > func) -{ - return t->call_func_later(func, timeout); -} - -void -llarp_free_timer(struct llarp_timer_context* t) -{ - delete t; -} - -void -llarp_timer_remove_job(struct llarp_timer_context* t, uint32_t id) -{ - t->remove(id); -} - -void -llarp_timer_stop(struct llarp_timer_context* t) -{ - llarp::LogDebug("timers stopping"); - // destroy all timers - // don't call callbacks on timers - { - llarp::util::Lock lock(&t->timersMutex); - t->timers.clear(); - t->stop(); - } - if(t->ticker) - t->ticker->SignalAll(); - llarp::LogDebug("timers stopped"); -} - -void -llarp_timer_cancel_job(struct llarp_timer_context* t, uint32_t id) -{ - t->cancel(id); -} - -void -llarp_timer_set_time(struct llarp_timer_context* t, llarp_time_t now) -{ - if(now == 0) - now = llarp::time_now_ms(); - t->m_Now = now; -} - -void -llarp_timer_tick_all(struct llarp_timer_context* t) -{ - if(!t->run()) - return; - const auto now = llarp::time_now_ms(); - t->m_Now = now; - std::list< std::unique_ptr< llarp::timer > > hit; - { - llarp::util::Lock lock(&t->timersMutex); - auto itr = t->timers.begin(); - while(itr != t->timers.end()) - { - if(now - itr->second->started >= itr->second->timeout - || itr->second->canceled) - { - // timer hit - hit.emplace_back(std::move(itr->second)); - itr = t->timers.erase(itr); - } - else - { - ++itr; - } - } - } - while(not hit.empty()) - { - const auto& h = hit.front(); - h->called_at = now; - h->exec(); - hit.pop_front(); - } - // reindex next tick info - { - llarp::util::Lock lock(&t->timersMutex); - t->m_Now = now; - t->m_NextRequiredTickAt = std::numeric_limits< llarp_time_t >::max(); - for(const auto& item : t->timers) - { - t->m_NextRequiredTickAt = - std::min(t->m_NextRequiredTickAt, item.second->timeout + t->m_Now); - } - t->m_NumPendingTimers = t->timers.size(); - } -} - -bool -llarp_timer_should_call(struct llarp_timer_context* t) -{ - return t->ShouldTriggerTimers(0); -} - -void -llarp_timer_tick_all_async(struct llarp_timer_context* t, - struct llarp_threadpool* pool, llarp_time_t now) -{ - llarp_timer_set_time(t, now); - if(t->ShouldTriggerTimers(0)) - llarp_threadpool_queue_job(pool, std::bind(&llarp_timer_tick_all, t)); -} - -void -llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool) -{ - t->ticker = std::make_unique< llarp::util::Condition >(); - while(t->run()) - { - // wait for timer mutex - if(t->ticker) - { - llarp::util::Lock lock(&t->tickerMutex); - t->ticker->WaitWithTimeout(&t->tickerMutex, t->nextTickLen); - } - - if(t->run()) - { - llarp::util::Lock lock(&t->timersMutex); - // we woke up - llarp_timer_tick_all_async(t, pool, llarp::time_now_ms()); - } - } -} - -namespace llarp -{ - void - timer::exec() - { - if(func) - { - auto diff = called_at - started; - // zero out function pointer before call to prevent multiple calls being - // queued if call takes longer than 1 timer tick - auto call = func; - func = nullptr; - if(diff >= timeout) - call(user, timeout, 0); - else - call(user, timeout, diff); - } - if(deferredFunc && not canceled) - deferredFunc(); - deferredFunc = nullptr; - done = true; - } -} // namespace llarp diff --git a/llarp/util/thread/timer.hpp b/llarp/util/thread/timer.hpp deleted file mode 100644 index 4e162bcbd0..0000000000 --- a/llarp/util/thread/timer.hpp +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef LLARP_TIMER_HPP -#define LLARP_TIMER_HPP - -#include -#include -#include - -#include - -/** called with userptr, original timeout, left */ -using llarp_timer_handler_func = - std::function< void(void *, uint64_t, uint64_t) >; - -struct llarp_timeout_job -{ - uint64_t timeout; - void *user; - llarp_timer_handler_func handler; -}; - -struct llarp_timer_context; - -struct llarp_timer_context * -llarp_init_timer(); - -uint32_t -llarp_timer_call_later(struct llarp_timer_context *t, - struct llarp_timeout_job job); - -uint32_t -llarp_timer_call_func_later(llarp_timer_context *t, llarp_time_t timeout, - std::function< void(void) > func); - -void -llarp_timer_cancel_job(struct llarp_timer_context *t, uint32_t id); - -void -llarp_timer_remove_job(struct llarp_timer_context *t, uint32_t id); - -bool -llarp_timer_should_call(struct llarp_timer_context *t); - -// cancel all -void -llarp_timer_stop(struct llarp_timer_context *t); - -/// set timer's timestamp, if now is 0 use the current time from system clock, -/// llarp_time_t now -void -llarp_timer_set_time(struct llarp_timer_context *t, llarp_time_t now); - -// blocking run timer and send events to thread pool -void -llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool); - -/// single threaded run timer, tick all timers -void -llarp_timer_tick_all(struct llarp_timer_context *t); - -/// tick all timers into a threadpool asynchronously -void -llarp_timer_tick_all_async(struct llarp_timer_context *t, - struct llarp_threadpool *pool, llarp_time_t now); - -void -llarp_free_timer(struct llarp_timer_context *t); - -#endif diff --git a/test/test_libabyss.cpp b/test/test_libabyss.cpp index 28625426e7..5bc384d584 100644 --- a/test/test_libabyss.cpp +++ b/test/test_libabyss.cpp @@ -29,20 +29,6 @@ struct AbyssTestBase : public ::testing::Test ASSERT_EQ(meth, method); } - static void - CancelIt(void* u, ABSL_ATTRIBUTE_UNUSED uint64_t orig, uint64_t left) - { - if(left) - return; - static_cast< AbyssTestBase* >(u)->Stop(); - } - - static void - StopIt(void* u) - { - static_cast< AbyssTestBase* >(u)->Stop(); - } - void Start() { @@ -59,7 +45,7 @@ struct AbyssTestBase : public ::testing::Test if(server->ServeAsync(loop, logic, a)) { client->RunAsync(loop, a.ToString()); - logic->call_later({1000, this, &CancelIt}); + logic->call_later(1000, std::bind(&AbyssTestBase::Stop, this)); return; } std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -76,7 +62,7 @@ struct AbyssTestBase : public ::testing::Test void AsyncStop() { - logic->queue_job({this, &StopIt}); + LogicCall(logic, std::bind(&AbyssTestBase::Stop, this)); } ~AbyssTestBase() @@ -159,16 +145,10 @@ struct AbyssTest : public AbyssTestBase, return new ServerHandler(impl, this); } - static void - FlushIt(void* u) - { - static_cast< AbyssTest* >(u)->Flush(); - } - void AsyncFlush() { - logic->queue_job({this, &FlushIt}); + LogicCall(logic, std::bind(&AbyssTest::Flush, this)); } void diff --git a/test/util/test_llarp_util_aligned.cpp b/test/util/test_llarp_util_aligned.cpp index e22732b309..3f87821438 100644 --- a/test/util/test_llarp_util_aligned.cpp +++ b/test/util/test_llarp_util_aligned.cpp @@ -7,13 +7,14 @@ #include #include -using TestSizes = ::testing::Types< std::integral_constant< std::size_t, 2 >, - std::integral_constant< std::size_t, 3 >, - std::integral_constant< std::size_t, 4 >, - std::integral_constant< std::size_t, 8 >, +using TestSizes = ::testing::Types< std::integral_constant< std::size_t, 8 >, + std::integral_constant< std::size_t, 12 >, std::integral_constant< std::size_t, 16 >, std::integral_constant< std::size_t, 32 >, - std::integral_constant< std::size_t, 64 > >; + std::integral_constant< std::size_t, 64 >, + std::integral_constant< std::size_t, 77 >, + std::integral_constant< std::size_t, 1024 >, + std::integral_constant< std::size_t, 3333 > >; template < typename T > struct AlignedBufferTest : public ::testing::Test