diff --git a/galene_stream/__main__.py b/galene_stream/__main__.py index abdf2ed..718a646 100644 --- a/galene_stream/__main__.py +++ b/galene_stream/__main__.py @@ -19,12 +19,12 @@ def start(opt: argparse.Namespace): :param opt: program options :type opt: argparse.Namespace """ - client = GaleneClient(opt.group, opt.output, opt.username, opt.password) + client = GaleneClient(opt.input, opt.output, opt.group, opt.username, opt.password) # Connect and run main even loop event_loop = asyncio.get_event_loop() event_loop.run_until_complete(client.connect()) - event_loop.run_until_complete(client.loop(event_loop, opt.input)) + event_loop.run_until_complete(client.loop(event_loop)) def main(): @@ -43,10 +43,10 @@ def main(): parser.add_argument( "-i", "--input", - default="rtmp://localhost:1935/live/test", + required=True, help=( 'URI to use as GStreamer "uridecodebin" module input, ' - 'default to "rtmp://localhost:1935/live/test"' + 'e.g. "rtmp://localhost:1935/live/test"' ), ) parser.add_argument( diff --git a/galene_stream/galene.py b/galene_stream/galene.py index 6e44f37..a23bee0 100644 --- a/galene_stream/galene.py +++ b/galene_stream/galene.py @@ -17,18 +17,26 @@ log = logging.getLogger(__name__) -class GaleneClient(WebRTCClient): +class GaleneClient: """Galène protocol implementation.""" def __init__( - self, group: str, server: str, username: str, password=None, identifier=None + self, + input_uri: str, + server: str, + group: str, + username: str, + password=None, + identifier=None, ): """Create GaleneClient - :param group: group to join - :type group: str + :param input_uri: URI for GStreamer uridecodebin + :type input_uri: str :param server: websocket url to connect to :type server: str + :param group: group to join + :type group: str :param username: group user name :type username: str :param password: group user password if required @@ -41,13 +49,16 @@ def __init__( # Create random client id identifier = secrets.token_bytes(16).hex() - self.group = group self.server = server + self.group = group self.username = username self.password = password self.client_id = identifier self.conn = None self.ice_servers = None + self.webrtc = WebRTCClient( + input_uri, self.send_sdp_offer, self.send_ice_candidate + ) async def send(self, message: dict): """Send message to remote. @@ -115,8 +126,8 @@ async def connect(self): "password": self.password, } await self.send(msg) - response = None - while response is None or response["type"] != "joined": + response = {"type": "none"} + while response["type"] != "joined": # The server will send 'user' messages that we ignore response = await self.conn.recv() response = json.loads(response) @@ -124,20 +135,18 @@ async def connect(self): raise RuntimeError("failed to join room") self.ice_servers = response.get("rtcConfiguration").get("iceServers", []) - async def loop(self, event_loop, input_uri: str) -> int: + async def loop(self, event_loop) -> int: """Client loop :param event_loop: asyncio event loop :type event_loop: EventLoop - :param input_uri: URI for GStreamer uridecodebin - :type input_uri: str :raises RuntimeError: if client is not connected :return: exit code :rtype: int """ if self.conn is None: raise RuntimeError("client not connected") - self.start_pipeline(event_loop, self.ice_servers, input_uri) + self.webrtc.start_pipeline(event_loop, self.ice_servers) log.info("Waiting for incoming stream...") async for message in self.conn: @@ -153,16 +162,16 @@ async def loop(self, event_loop, input_uri: str) -> int: elif message["type"] == "answer": # Server is sending a SDP offer sdp = message.get("sdp") - self.set_remote_sdp(sdp) + self.webrtc.set_remote_sdp(sdp) elif message["type"] == "ice": # Server is sending trickle ICE candidates log.debug("Receiving new ICE candidate from remote") mline_index = message.get("candidate").get("sdpMLineIndex") candidate = message.get("candidate").get("candidate") - self.add_ice_candidate(mline_index, candidate) + self.webrtc.add_ice_candidate(mline_index, candidate) elif message["type"] == "renegotiate": # Server is asking to renegotiate WebRTC session - self.on_negotiation_needed(self.webrtc) + self.webrtc.on_negotiation_needed(self.webrtc) elif message["type"] == "usermessage": value = message.get("value") if message["kind"] == "error": @@ -180,4 +189,4 @@ async def loop(self, event_loop, input_uri: str) -> int: # Oh no! We receive something not implemented log.warn(f"Not implemented {message}") - self.close_pipeline() + self.webrtc.close_pipeline() diff --git a/galene_stream/webrtc.py b/galene_stream/webrtc.py index 71cf430..c17b60d 100644 --- a/galene_stream/webrtc.py +++ b/galene_stream/webrtc.py @@ -29,17 +29,26 @@ class WebRTCClient: Based on . """ - def __init__(self): - """Init WebRTCClient.""" + def __init__(self, input_uri: str, sdp_offer_callback, ice_candidate_callback): + """Init WebRTCClient. + + :param input_uri: URI for GStreamer uridecodebin + :type input_uri: str + :param sdp_offer_callback: function to send SDP offer + :type sdp_offer_callback: function + :param ice_candidate_callback: function to send ICE candidate + :type ice_candidate_callback: function + """ self.event_loop = None self.pipe = None self.webrtc = None - self.input_uri = "" + self.sdp_offer_callback = sdp_offer_callback + self.ice_candidate_callback = ice_candidate_callback # webrtcbin latency parameter was added in gstreamer 1.18 self.pipeline_desc = ( "webrtcbin name=send bundle-policy=max-bundle latency=500 " - "uridecodebin uri={input_uri} name=bin " + f"uridecodebin uri={input_uri} name=bin " "bin. ! vp8enc deadline=1 keyframe-max-dist=5 target-bitrate=5000000 ! rtpvp8pay pt=97 ! send. " "bin. ! audioconvert ! audioresample ! opusenc ! rtpopuspay pt=96 ! send." ) @@ -87,7 +96,7 @@ def on_offer_created(self, promise, _, __): # Send local SDP offer to remote offer = offer.sdp.as_text() future = asyncio.run_coroutine_threadsafe( - self.send_sdp_offer(offer), self.event_loop + self.sdp_offer_callback(offer), self.event_loop ) future.result() # wait @@ -117,7 +126,7 @@ def on_ice_candidate(self, _, mline_index, candidate: str): """ candidate = {"candidate": candidate, "sdpMLineIndex": mline_index} future = asyncio.run_coroutine_threadsafe( - self.send_ice_candidate(candidate), self.event_loop + self.ice_candidate_callback(candidate), self.event_loop ) future.result() # wait @@ -147,20 +156,17 @@ def add_ice_candidate(self, mline_index: int, candidate: str): """ self.webrtc.emit("add-ice-candidate", mline_index, candidate) - def start_pipeline(self, event_loop, ice_servers, input_uri): + def start_pipeline(self, event_loop, ice_servers): """Start gstreamer pipeline and connect WebRTC events. :param event_loop: asyncio event loop :type event_loop: EventLoop :param ice_servers: list of ICE TURN servers :type ice_servers: list of dicts - :param input_uri: URI for GStreamer uridecodebin - :type input_uri: str """ log.info("Starting pipeline") self.event_loop = event_loop - self.input_uri = input_uri - self.pipe = Gst.parse_launch(self.pipeline_desc.format(input_uri=input_uri)) + self.pipe = Gst.parse_launch(self.pipeline_desc) self.webrtc = self.pipe.get_by_name("send") self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed) self.webrtc.connect("on-ice-candidate", self.on_ice_candidate)