diff --git a/libs/gl-testing/gltesting/fixtures.py b/libs/gl-testing/gltesting/fixtures.py index 36ffce1d1..11ee3bbdf 100644 --- a/libs/gl-testing/gltesting/fixtures.py +++ b/libs/gl-testing/gltesting/fixtures.py @@ -23,7 +23,7 @@ from gltesting.network import node_factory from pyln.testing.fixtures import directory as str_directory from decimal import Decimal -from gltesting.grpcweb import GrpcWebProxy +from gltesting.grpcweb import GrpcWebProxy, NodeHandler from clnvm import ClnVersionManager @@ -220,3 +220,16 @@ def grpc_web_proxy(scheduler, grpc_test_server): yield p p.stop() + + +@pytest.fixture +def node_grpc_web_proxy(scheduler): + """A grpc-web proxy that knows how to talk to nodes. + """ + p = GrpcWebProxy(scheduler=scheduler, grpc_port=0) + p.handler_cls = NodeHandler + p.start() + + yield p + + p.stop() diff --git a/libs/gl-testing/gltesting/grpcweb.py b/libs/gl-testing/gltesting/grpcweb.py index 2a7588744..05a668dff 100644 --- a/libs/gl-testing/gltesting/grpcweb.py +++ b/libs/gl-testing/gltesting/grpcweb.py @@ -12,6 +12,10 @@ from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler import logging import struct +import httpx +from dataclasses import dataclass +from typing import Dict +import ssl class GrpcWebProxy(object): @@ -26,6 +30,7 @@ def __init__(self, scheduler: Scheduler, grpc_port: int): self.logger.info( f"GrpcWebProxy configured to forward requests from web_port={self.web_port} to grpc_port={self.grpc_port}" ) + self.handler_cls = Handler def start(self): self._thread = Thread(target=self.run, daemon=True) @@ -33,8 +38,12 @@ def start(self): self.running = True server_address = ("127.0.0.1", self.web_port) - self.httpd = ThreadingHTTPServer(server_address, Handler) + self.httpd = ThreadingHTTPServer(server_address, self.handler_cls) self.httpd.grpc_port = self.grpc_port + + # Just a simple way to pass the scheduler to the handler + self.httpd.scheduler = self.scheduler + self.logger.debug(f"Server startup complete") self._thread.start() @@ -47,11 +56,49 @@ def stop(self): self._thread.join() +@dataclass +class Request: + body: bytes + headers: Dict[str, str] + flags: int + length: int + + +@dataclass +class Response: + body: bytes + + class Handler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): self.logger = logging.getLogger("gltesting.grpcweb.Handler") BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + def proxy(self, request) -> Response: + """Callback called with the request, implementing the proxying.""" + url = f"http://localhost:{self.server.grpc_port}{self.path}" + self.logger.debug(f"Forwarding request to '{url}'") + headers = { + "te": "trailers", + "Content-Type": "application/grpc", + "grpc-accept-encoding": "identity", + "user-agent": "gl-testing-grpc-web-proxy", + } + content = struct.pack("!cI", request.flags, request.length) + request.body + req = httpx.Request( + "POST", + url, + headers=headers, + content=content, + ) + client = httpx.Client(http1=False, http2=True) + res = client.send(req) + return Response(body=res.content) + + def auth(self, request: Request) -> bool: + """Authenticate the request. True means allow.""" + return True + def do_POST(self): # We don't actually touch the payload, so we do not really # care about the flags ourselves. The upstream sysmte will @@ -69,6 +116,25 @@ def do_POST(self): # need to decode it, and we can treat it as opaque blob. body = self.rfile.read(length) + req = Request(body=body, headers=self.headers, flags=flags, length=length) + if not self.auth(req): + self.wfile.write(b"HTTP/1.1 401 Unauthorized\r\n\r\n") + return + + response = self.proxy(req) + self.wfile.write(b"HTTP/1.0 200 OK\n\n") + self.wfile.write(response.body) + self.wfile.flush() + + +class NodeHandler(Handler): + """A handler that is aware of nodes, their auth and how they schedule.""" + + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger("gltesting.grpcweb.NodeHandler") + BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + + def auth(self, request: Request) -> bool: # TODO extract the `glauthpubkey` and the `glauthsig`, then # verify them. Fail the call if the verification fails, # forward otherwise. @@ -76,33 +142,69 @@ def do_POST(self): # multiplexing support in `h2`, which simplifies this proxy # quite a bit. The production server maintains a cache of # connections and multiplexes correctly. + pk = request.headers.get("glauthpubkey", None) + sig = request.headers.get("glauthsig", None) + ts = request.headers.get("glts", None) - import httpx + if not pk: + self.logger.warn(f"Missing public key header") + return False - url = f"http://localhost:{self.server.grpc_port}{self.path}" - self.logger.debug(f"Forwarding request to '{url}'") + if not sig: + self.logger.warn(f"Missing signature header") + return False + + if not ts: + self.logger.warn(f"Missing timestamp header") + return False + + # TODO Check the signature. + return True + + def proxy(self, request: Request): + # Fetch current location of the node + + pk = request.headers.get("glauthpubkey") + from base64 import b64decode + + pk = b64decode(pk) + + node = self.server.scheduler.get_node(pk) + self.logger.debug(f"Found node for node_id={pk.hex()}") + + # TODO Schedule node if not scheduled + + client_cert = node.identity.private_key + ca_path = node.identity.caroot_path + + # Load TLS client cert info client + ctx = httpx.create_ssl_context( + verify=ca_path, + http2=True, + cert=( + node.identity.cert_chain_path, + node.identity.private_key_path, + ), + ) + client = httpx.Client(http1=False, http2=True, verify=ctx) + + url = f"{node.process.grpc_uri}{self.path}" headers = { "te": "trailers", "Content-Type": "application/grpc", - "grpc-accept-encoding": "idenity", - "user-agent": "My bloody hacked up script", + "grpc-accept-encoding": "identity", + "user-agent": "gl-testing-grpc-web-proxy", } - content = struct.pack("!cI", flags, length) + body + content = struct.pack("!cI", request.flags, request.length) + request.body + + # Forward request req = httpx.Request( "POST", url, headers=headers, content=content, ) - client = httpx.Client(http1=False, http2=True) - - res = client.send(req) res = client.send(req) - canned = b"\n\rheklllo world" - l = struct.pack("!I", len(canned)) - self.wfile.write(b"HTTP/1.0 200 OK\n\n") - self.wfile.write(b"\x00") - self.wfile.write(l) - self.wfile.write(canned) - self.wfile.flush() + # Return response + return Response(body=res.content) diff --git a/libs/gl-testing/gltesting/test_pb2_grpc.py b/libs/gl-testing/gltesting/test_pb2_grpc.py index 395457cd5..916320e1a 100644 --- a/libs/gl-testing/gltesting/test_pb2_grpc.py +++ b/libs/gl-testing/gltesting/test_pb2_grpc.py @@ -16,7 +16,7 @@ def __init__(self, channel): """ self.SayHello = channel.unary_unary( '/gltesting.Greeter/SayHello', - request_serializer=gltesting_dot_test__pb2.HelloRequest.SerializeToString, + request_serializer=gltesting_dot_test__pb2.HelloRequest.SerializeToyString, response_deserializer=gltesting_dot_test__pb2.HelloReply.FromString, ) diff --git a/libs/gl-testing/tests/test_grpc_web.py b/libs/gl-testing/tests/test_grpc_web.py index 6feca29d5..ae2d740f1 100644 --- a/libs/gl-testing/tests/test_grpc_web.py +++ b/libs/gl-testing/tests/test_grpc_web.py @@ -5,6 +5,32 @@ from gltesting.test_pb2_grpc import GreeterStub from gltesting.test_pb2 import HelloRequest import sonora.client +from pyln import grpc as clnpb +from base64 import b64encode +from time import time +import struct +from typing import Any + + +class GrpcWebClient: + """A simple grpc-web client that implements the calling convention.""" + + def __init__(self, node_grpc_web_proxy_uri, node_id: bytes): + self.node_id = node_id + self.node_grpc_web_proxy_uri = node_grpc_web_proxy_uri + self.channel = sonora.client.insecure_web_channel(node_grpc_web_proxy_uri) + self.stub = clnpb.NodeStub(self.channel) + + def call(self, method_name: str, req: Any) -> Any: + ts = struct.pack("!Q", int(time() * 1000)) + metadata = [ + ("glauthpubkey", b64encode(self.node_id).decode("ASCII")), + ("glauthsig", b64encode(b"\x00" * 64).decode("ASCII")), + ("glts", b64encode(ts).decode("ASCII")), + ] + func = self.stub.__dict__.get(method_name) + return func(req, metadata=metadata) + def test_start(grpc_web_proxy): with sonora.client.insecure_web_channel( @@ -14,3 +40,29 @@ def test_start(grpc_web_proxy): req = HelloRequest(name="greenlight") print(stub.SayHello(req)) + +def test_node_grpc_web(scheduler, node_grpc_web_proxy, clients): + """Ensure that the""" + # Start by creating a node + c = clients.new() + c.register(configure=True) + n = c.node() + info = n.get_info() + + # Now extract the TLS certificates, so we can sign the payload. + node_id = info.id + key_path = c.directory / "device-key.pem" + ca_path = c.directory / "ca.pem" + + proxy_uri = f"http://localhost:{node_grpc_web_proxy.web_port}" + web_client = GrpcWebClient(proxy_uri, node_id) + + req = clnpb.GetinfoRequest() + info = web_client.call("Getinfo", req) + print(info) + + # Configure the web client to sign its requests too + + # Issue a request to the node through the proxy. + # Assert the `get_info` call is identical + breakpoint() diff --git a/libs/gl-testing/tests/util/grpcserver.py b/libs/gl-testing/tests/util/grpcserver.py index 901ac6e05..34fbc13fe 100644 --- a/libs/gl-testing/tests/util/grpcserver.py +++ b/libs/gl-testing/tests/util/grpcserver.py @@ -33,4 +33,4 @@ def target(): self.thread.start() def stop(self): - self.inner.aclose + pass