Skip to content

Commit

Permalink
Merge pull request #2121 from drouhana/rpc-refactor
Browse files Browse the repository at this point in the history
RPC refactor
  • Loading branch information
jagerman authored Feb 1, 2023
2 parents 7ae1a1a + 9bfe881 commit 7fb3678
Show file tree
Hide file tree
Showing 28 changed files with 2,415 additions and 696 deletions.
99 changes: 99 additions & 0 deletions contrib/omq-rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python3

import nacl.bindings as sodium
from nacl.public import PrivateKey
from nacl.signing import SigningKey, VerifyKey
import nacl.encoding
import requests
import zmq
import zmq.utils.z85
import sys
import re
import time
import random
import shutil


context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.CONNECT_TIMEOUT, 5000)
socket.setsockopt(zmq.HANDSHAKE_IVL, 5000)
#socket.setsockopt(zmq.IMMEDIATE, 1)

if len(sys.argv) > 1 and any(sys.argv[1].startswith(x) for x in ("ipc://", "tcp://", "curve://")):
remote = sys.argv[1]
del sys.argv[1]
else:
remote = "ipc://./rpc.sock"

curve_pubkey = b''
my_privkey, my_pubkey = b'', b''

# If given a curve://whatever/pubkey argument then transform it into 'tcp://whatever' and put the
# 'pubkey' back into argv to be handled below.
if remote.startswith("curve://"):
pos = remote.rfind('/')
pkhex = remote[pos+1:]
remote = "tcp://" + remote[8:pos]
if len(pkhex) != 64 or not all(x in "0123456789abcdefABCDEF" for x in pkhex):
print("curve:// addresses must be in the form curve://HOST:PORT/REMOTE_PUBKEY_HEX", file=sys.stderr)
sys.exit(1)
sys.argv[1:0] = [pkhex]

if len(sys.argv) > 1 and len(sys.argv[1]) == 64 and all(x in "0123456789abcdefABCDEF" for x in sys.argv[1]):
curve_pubkey = bytes.fromhex(sys.argv[1])
del sys.argv[1]
socket.curve_serverkey = curve_pubkey
if len(sys.argv) > 1 and len(sys.argv[1]) == 64 and all(x in "0123456789abcdefABCDEF" for x in sys.argv[1]):
my_privkey = bytes.fromhex(sys.argv[1])
del sys.argv[1]
my_pubkey = zmq.utils.z85.decode(zmq.curve_public(zmq.utils.z85.encode(my_privkey)))
else:
my_privkey = PrivateKey.generate()
my_pubkey = my_privkey.public_key.encode()
my_privkey = my_privkey.encode()

print("No curve client privkey given; generated a random one (pubkey: {}, privkey: {})".format(
my_pubkey.hex(), my_privkey.hex()), file=sys.stderr)
socket.curve_secretkey = my_privkey
socket.curve_publickey = my_pubkey

if not 2 <= len(sys.argv) <= 3 or any(x in y for x in ("--help", "-h") for y in sys.argv[1:]):
print("Usage: {} [ipc:///path/to/sock|tcp://1.2.3.4:5678] [SERVER_CURVE_PUBKEY [LOCAL_CURVE_PRIVKEY]] COMMAND ['JSON']".format(
sys.argv[0]), file=sys.stderr)
sys.exit(1)

beginning_of_time = time.clock_gettime(time.CLOCK_MONOTONIC)

print("Connecting to {}".format(remote), file=sys.stderr)
socket.connect(remote)
to_send = [sys.argv[1].encode(), b'tagxyz123']
to_send += (x.encode() for x in sys.argv[2:])
print("Sending {}".format(to_send[0]), file=sys.stderr)
socket.send_multipart(to_send)
if socket.poll(timeout=5000):
m = socket.recv_multipart()
recv_time = time.clock_gettime(time.CLOCK_MONOTONIC)
if len(m) < 3 or m[0:2] != [b'REPLY', b'tagxyz123']:
print("Received unexpected {}-part reply:".format(len(m)), file=sys.stderr)
for x in m:
print("- {}".format(x))
else: # m[2] is numeric value, m[3] is data part, and will become m[2] <- changed
print("Received reply in {:.6f}s:".format(recv_time - beginning_of_time), file=sys.stderr)
if len(m) < 3:
print("(empty reply data)", file=sys.stderr)
else:
for x in m[2:]:
print("{} bytes data part:".format(len(x)), file=sys.stderr)
if any(x.startswith(y) for y in (b'd', b'l', b'i')) and x.endswith(b'e'):
sys.stdout.buffer.write(x)
else:
print(x.decode(), end="\n\n")

else:
print("Request timed out", file=sys.stderr)
socket.close(linger=0)
sys.exit(1)

# sample usage:
# ./omq-rpc.py ipc://$HOME/.oxen/testnet/oxend.sock 'llarp.get_service_nodes' | jq
100 changes: 74 additions & 26 deletions daemon/lokinet-vpn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
#include <vector>
#include <array>
#include <llarp/net/net.hpp>
#include <string_view>

#include <CLI/App.hpp>
#include <CLI/Formatter.hpp>
#include <CLI/Config.hpp>
#include "oxenmq/address.h"

#ifdef _WIN32
// add the unholy windows headers for iphlpapi
Expand Down Expand Up @@ -56,14 +58,14 @@ OMQ_Request(

namespace
{

struct command_line_options
{
// bool options
bool verbose = false;
bool help = false;
bool vpnUp = false;
bool vpnDown = false;
bool swap = false;
bool printStatus = false;
bool killDaemon = false;

Expand All @@ -73,9 +75,10 @@ namespace
std::string endpoint = "default";
std::string token;
std::optional<std::string> range;
std::vector<std::string> swapExits;

// oxenmq
oxenmq::address rpcURL{"tcp://127.0.0.1:1190"};
oxenmq::address rpcURL{};
oxenmq::LogLevel logLevel = oxenmq::LogLevel::warn;
};

Expand Down Expand Up @@ -109,15 +112,23 @@ main(int argc, char* argv[])

// flags: boolean values in command_line_options struct
cli.add_flag("-v,--verbose", options.verbose, "Verbose");
cli.add_flag("--up", options.vpnUp, "Put VPN up");
cli.add_flag("--down", options.vpnDown, "Put VPN down");
cli.add_flag("--add,--up", options.vpnUp, "Map VPN connection to exit node [--up is deprecated]");
cli.add_flag(
"--remove,--down",
options.vpnDown,
"Unmap VPN connection to exit node [--down is deprecated]");
cli.add_flag("--status", options.printStatus, "Print VPN status and exit");
cli.add_flag("-k,--kill", options.killDaemon, "Kill lokinet daemon");

// options: string values in command_line_options struct
cli.add_option("--exit", options.exitAddress, "Specify exit node address")->capture_default_str();
cli.add_option("--endpoint", options.endpoint, "Endpoint to use")->capture_default_str();
cli.add_option("--token", options.token, "Exit auth token to use")->capture_default_str();
cli.add_option("--token,--auth", options.token, "Exit auth token to use")->capture_default_str();
cli.add_option("--range", options.range, "IP range to map exit to")->capture_default_str();
cli.add_option(
"--swap", options.swapExits, "Exit addresses to swap mapped connection to [old] [new]")
->expected(2)
->capture_default_str();

// options: oxenmq values in command_line_options struct
cli.add_option("--rpc", options.rpc, "Specify RPC URL for lokinet")->capture_default_str();
Expand Down Expand Up @@ -149,16 +160,17 @@ main(int argc, char* argv[])
cli.exit(e);
};

int numCommands = options.vpnUp + options.vpnDown + options.printStatus + options.killDaemon;
int numCommands = options.vpnUp + options.vpnDown + options.printStatus + options.killDaemon
+ (not options.swapExits.empty());

switch (numCommands)
{
case 0:
return exit_error(3, "One of --up/--down/--status/--kill must be specified");
return exit_error(3, "One of --add/--remove/--swap/--status/--kill must be specified");
case 1:
break;
default:
return exit_error(3, "Only one of --up/--down/--status/--kill may be specified");
return exit_error(3, "Only one of --add/--remove/--swap/--status/--kill may be specified");
}

if (options.vpnUp and options.exitAddress.empty())
Expand All @@ -170,12 +182,14 @@ main(int argc, char* argv[])
},
options.logLevel};

options.rpcURL = oxenmq::address{(options.rpc.empty()) ? "tcp://127.0.0.1:1190" : options.rpc};

omq.start();

std::promise<bool> connectPromise;

const auto connectionID = omq.connect_remote(
options.rpc,
options.rpcURL,
[&connectPromise](auto) { connectPromise.set_value(true); },
[&connectPromise](auto, std::string_view msg) {
std::cout << "Failed to connect to lokinet RPC: " << msg << std::endl;
Expand All @@ -188,49 +202,74 @@ main(int argc, char* argv[])

if (options.killDaemon)
{
if (not OMQ_Request(omq, connectionID, "llarp.halt"))
auto maybe_halt = OMQ_Request(omq, connectionID, "llarp.halt");

if (not maybe_halt)
return exit_error("Call to llarp.halt failed");
return 0;

if (auto err_it = maybe_halt->find("error");
err_it != maybe_halt->end() and not err_it.value().is_null())
{
return exit_error("{}", err_it.value());
}
}

if (options.printStatus)
{
const auto maybe_status = OMQ_Request(omq, connectionID, "llarp.status");

if (not maybe_status)
return exit_error("call to llarp.status failed");
return exit_error("Call to llarp.status failed");

try
{
const auto& ep = maybe_status->at("result").at("services").at(options.endpoint);
const auto exitMap = ep.at("exitMap");
if (exitMap.empty())
const auto& ep = maybe_status->at("result").at("services").at(options.endpoint).at("exitMap");

if (ep.empty())
{
std::cout << "no exits" << std::endl;
std::cout << "No exits found" << std::endl;
}
else
{
for (const auto& [range, exit] : exitMap.items())
for (const auto& [range, exit] : ep.items())
{
std::cout << range << " via " << exit.get<std::string>() << std::endl;
}
}
}
catch (std::exception& ex)
{
return exit_error("failed to parse result: {}", ex.what());
return exit_error("Failed to parse result: {}", ex.what());
}
return 0;
}

if (not options.swapExits.empty())
{
nlohmann::json opts{{"exit_addresses", std::move(options.swapExits)}};

auto maybe_swap = OMQ_Request(omq, connectionID, "llarp.swap_exits", std::move(opts));

if (not maybe_swap)
return exit_error("Failed to swap exit node connections");

if (auto err_it = maybe_swap->find("error");
err_it != maybe_swap->end() and not err_it.value().is_null())
{
return exit_error("{}", err_it.value());
}
}

if (options.vpnUp)
{
nlohmann::json opts{{"exit", options.exitAddress}, {"token", options.token}};
nlohmann::json opts{{"address", options.exitAddress}, {"token", options.token}};
if (options.range)
opts["range"] = *options.range;
opts["ip_range"] = *options.range;

auto maybe_result = OMQ_Request(omq, connectionID, "llarp.exit", std::move(opts));
auto maybe_result = OMQ_Request(omq, connectionID, "llarp.map_exit", std::move(opts));

if (not maybe_result)
return exit_error("could not add exit");
return exit_error("Could not add exit");

if (auto err_it = maybe_result->find("error");
err_it != maybe_result->end() and not err_it.value().is_null())
Expand All @@ -240,11 +279,20 @@ main(int argc, char* argv[])
}
if (options.vpnDown)
{
nlohmann::json opts{{"unmap", true}};
nlohmann::json opts{{"unmap_exit", true}};
if (options.range)
opts["range"] = *options.range;
if (not OMQ_Request(omq, connectionID, "llarp.exit", std::move(opts)))
return exit_error("failed to unmap exit");
opts["ip_range"] = *options.range;

auto maybe_down = OMQ_Request(omq, connectionID, "llarp.unmap_exit", std::move(opts));

if (not maybe_down)
return exit_error("Failed to unmap exit node connection");

if (auto err_it = maybe_down->find("error");
err_it != maybe_down->end() and not err_it.value().is_null())
{
return exit_error("{}", err_it.value());
}
}

return 0;
Expand Down
2 changes: 1 addition & 1 deletion external/oxen-logging
3 changes: 3 additions & 0 deletions llarp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ add_library(lokinet-context
# lokinet-rpc holds all rpc related compilation units
add_library(lokinet-rpc
STATIC
rpc/json_binary_proxy.cpp
rpc/json_conversions.cpp
rpc/lokid_rpc_client.cpp
rpc/rpc_request_parser.cpp
rpc/rpc_server.cpp
rpc/endpoint_rpc.cpp
)
Expand Down
Loading

0 comments on commit 7fb3678

Please sign in to comment.