Skip to content

Commit

Permalink
Merge pull request #1060 from loki-project/dev
Browse files Browse the repository at this point in the history
0.6.4
  • Loading branch information
majestrate authored Jan 19, 2020
2 parents 50f4b42 + 5be1865 commit e19714e
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 31 deletions.
2 changes: 1 addition & 1 deletion llarp/constants/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define LLARP_VERSION_MAJ 0
#define LLARP_VERSION_MIN 6
#define LLARP_VERSION_PATCH 3
#define LLARP_VERSION_PATCH 4

#define LLARP_DEFAULT_NETID "lokinet"

Expand Down
7 changes: 7 additions & 0 deletions llarp/messages/link_intro.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ namespace llarp
{
return "LinkIntro";
}

// always first
uint16_t
Priority() const override
{
return std::numeric_limits< uint16_t >::max();
}
};
} // namespace llarp

Expand Down
7 changes: 7 additions & 0 deletions llarp/messages/link_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ namespace llarp
// the name of this kind of message
virtual const char*
Name() const = 0;

/// get message prority, higher value means more important
virtual uint16_t
Priority() const
{
return 1;
}
};

} // namespace llarp
Expand Down
11 changes: 11 additions & 0 deletions llarp/messages/relay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ namespace llarp
{
return "RelayUpstream";
}
uint16_t
Priority() const override
{
return 0;
}
};

struct RelayDownstreamMessage : public ILinkMessage
Expand All @@ -56,6 +61,12 @@ namespace llarp
{
return "RelayDownstream";
}

uint16_t
Priority() const override
{
return 0;
}
};
} // namespace llarp

Expand Down
6 changes: 6 additions & 0 deletions llarp/messages/relay_commit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ namespace llarp
{
return "RelayCommit";
}

virtual uint16_t
Priority() const override
{
return 5;
}
};
} // namespace llarp

Expand Down
5 changes: 5 additions & 0 deletions llarp/messages/relay_status.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ namespace llarp
{
return "RelayStatus";
}
virtual uint16_t
Priority() const override
{
return 6;
}
};
} // namespace llarp

Expand Down
61 changes: 46 additions & 15 deletions llarp/router/outbound_message_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace llarp
const ILinkMessage *msg,
SendStatusHandler callback)
{
const uint16_t priority = msg->Priority();
std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);

Expand All @@ -40,7 +41,7 @@ namespace llarp

if(_linkManager->HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg->pathid);
QueueOutboundMessage(remote, std::move(message), msg->pathid, priority);
return true;
}

Expand All @@ -53,8 +54,9 @@ namespace llarp
pendingSessionMessageQueues.emplace(remote, MessageQueue());

MessageQueueEntry entry;
entry.message = message;
entry.router = remote;
entry.priority = priority;
entry.message = message;
entry.router = remote;
itr_pair.first->second.push(std::move(entry));

shouldCreateSession = itr_pair.second;
Expand Down Expand Up @@ -89,7 +91,14 @@ namespace llarp
util::StatusObject
OutboundMessageHandler::ExtractStatus() const
{
util::StatusObject status{};
util::StatusObject status{"queueStats",
{{"queued", m_queueStats.queued},
{"dropped", m_queueStats.dropped},
{"sent", m_queueStats.sent},
{"queueWatermark", m_queueStats.queueWatermark},
{"perTickMax", m_queueStats.perTickMax},
{"numTicks", m_queueStats.numTicks}}};

return status;
}

Expand Down Expand Up @@ -201,6 +210,7 @@ namespace llarp
{
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
m_queueStats.sent++;
return _linkManager->SendTo(
remote, buf, [=](ILinkSession::DeliveryStatus status) {
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
Expand All @@ -224,18 +234,29 @@ namespace llarp
bool
OutboundMessageHandler::QueueOutboundMessage(const RouterID &remote,
Message &&msg,
const PathID_t &pathid)
const PathID_t &pathid,
uint16_t priority)
{
MessageQueueEntry entry;
entry.message = std::move(msg);
auto callback_copy = entry.message.second;
entry.router = remote;
entry.pathid = pathid;
entry.priority = priority;
if(outboundQueue.tryPushBack(std::move(entry))
!= llarp::thread::QueueReturn::Success)
{
m_queueStats.dropped++;
DoCallback(callback_copy, SendStatus::Congestion);
}
else
{
m_queueStats.queued++;

uint32_t queueSize = outboundQueue.size();
m_queueStats.queueWatermark =
std::max(queueSize, m_queueStats.queueWatermark);
}

return true;
}
Expand All @@ -257,11 +278,16 @@ namespace llarp
}

MessageQueue &path_queue = itr_pair.first->second;
if(path_queue.size() >= MAX_PATH_QUEUE_SIZE)

if(path_queue.size() < MAX_PATH_QUEUE_SIZE)
{
path_queue.pop(); // head drop
path_queue.push(std::move(entry));
}
else
{
DoCallback(entry.message.second, SendStatus::Congestion);
m_queueStats.dropped++;
}
path_queue.push(std::move(entry));
}
}

Expand All @@ -286,14 +312,15 @@ namespace llarp
void
OutboundMessageHandler::SendRoundRobin()
{
m_queueStats.numTicks++;

// send non-routing messages first priority
auto &non_routing_mq = outboundMessageQueues[zeroID];
while(not non_routing_mq.empty())
{
MessageQueueEntry entry = std::move(non_routing_mq.front());
non_routing_mq.pop();

const MessageQueueEntry &entry = non_routing_mq.top();
Send(entry.router, entry.message);
non_routing_mq.pop();
}

size_t empty_count = 0;
Expand Down Expand Up @@ -329,10 +356,11 @@ namespace llarp
auto &message_queue = outboundMessageQueues[pathid];
if(message_queue.size() > 0)
{
MessageQueueEntry entry = std::move(message_queue.front());
message_queue.pop();
const MessageQueueEntry &entry = message_queue.top();

Send(entry.router, entry.message);
message_queue.pop();

empty_count = 0;
sent_count++;
}
Expand All @@ -349,6 +377,9 @@ namespace llarp
break;
}
}

m_queueStats.perTickMax =
std::max((uint32_t)sent_count, m_queueStats.perTickMax);
}

void
Expand All @@ -372,8 +403,7 @@ namespace llarp

while(!movedMessages.empty())
{
MessageQueueEntry entry = std::move(movedMessages.front());
movedMessages.pop();
const MessageQueueEntry &entry = movedMessages.top();

if(status == SendStatus::Success)
{
Expand All @@ -383,6 +413,7 @@ namespace llarp
{
DoCallback(entry.message.second, status);
}
movedMessages.pop();
}
}

Expand Down
24 changes: 22 additions & 2 deletions llarp/router/outbound_message_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,30 @@ namespace llarp

struct MessageQueueEntry
{
uint16_t priority;
Message message;
PathID_t pathid;
RouterID router;

bool
operator<(const MessageQueueEntry &other) const
{
return other.priority < priority;
}
};

struct MessageQueueStats
{
uint64_t queued = 0;
uint64_t dropped = 0;
uint64_t sent = 0;
uint32_t queueWatermark = 0;

uint32_t perTickMax = 0;
uint32_t numTicks = 0;
};

using MessageQueue = std::queue< MessageQueueEntry >;
using MessageQueue = std::priority_queue< MessageQueueEntry >;

void
OnSessionEstablished(const RouterID &router);
Expand Down Expand Up @@ -91,7 +109,7 @@ namespace llarp

bool
QueueOutboundMessage(const RouterID &remote, Message &&msg,
const PathID_t &pathid);
const PathID_t &pathid, uint16_t priority = 0);

void
ProcessOutboundQueue();
Expand Down Expand Up @@ -128,6 +146,8 @@ namespace llarp
// paths cannot have pathid "0", so it can be used as the "pathid"
// for non-traffic (control) messages, so they can be prioritized.
static const PathID_t zeroID;

MessageQueueStats m_queueStats;
};

} // namespace llarp
Expand Down
22 changes: 17 additions & 5 deletions llarp/router/rc_lookup_handler.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <chrono>
#include <router/rc_lookup_handler.hpp>

#include <link/i_link_manager.hpp>
Expand All @@ -16,6 +17,8 @@
#include <functional>
#include <random>

using namespace std::chrono_literals;

namespace llarp
{
void
Expand Down Expand Up @@ -53,7 +56,7 @@ namespace llarp
RCLookupHandler::HaveReceivedWhitelist()
{
util::Lock l(&_mutex);
return whitelistRouters.empty();
return not whitelistRouters.empty();
}

void
Expand Down Expand Up @@ -113,6 +116,10 @@ namespace llarp
{
FinalizeRequest(router, nullptr, RCRequestResult::RouterNotFound);
}
else
{
_routerLookupTimes[router] = std::chrono::steady_clock::now();
}
}
}

Expand Down Expand Up @@ -248,19 +255,24 @@ namespace llarp

if(useWhitelist)
{
static constexpr size_t LookupPerTick = 25;
static constexpr auto RerequestInterval = 10min;
static constexpr size_t LookupPerTick = 5;

std::vector< RouterID > lookupRouters;
lookupRouters.reserve(LookupPerTick);

const auto now = std::chrono::steady_clock::now();

{
// if we are using a whitelist look up a few routers we don't have
util::Lock l(&_mutex);
for(const auto &r : whitelistRouters)
{
if(_nodedb->Has(r))
continue;
lookupRouters.emplace_back(r);
if(now > _routerLookupTimes[r] + RerequestInterval
and not _nodedb->Has(r))
{
lookupRouters.emplace_back(r);
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions llarp/router/rc_lookup_handler.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef LLARP_RC_LOOKUP_HANDLER_HPP
#define LLARP_RC_LOOKUP_HANDLER_HPP

#include <chrono>
#include <router/i_rc_lookup_handler.hpp>

#include <util/thread/threading.hpp>
Expand Down Expand Up @@ -115,6 +116,10 @@ namespace llarp
bool isServiceNode = false;

std::set< RouterID > whitelistRouters GUARDED_BY(_mutex);

using TimePoint = std::chrono::steady_clock::time_point;
std::unordered_map< RouterID, TimePoint, RouterID::Hash >
_routerLookupTimes;
};

} // namespace llarp
Expand Down
Loading

0 comments on commit e19714e

Please sign in to comment.