Skip to content

Commit

Permalink
Do not inherit WebRTCClient
Browse files Browse the repository at this point in the history
  • Loading branch information
erdnaxe committed Apr 4, 2021
1 parent 6078bdd commit 39287dd
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
8 changes: 4 additions & 4 deletions galene_stream/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ def start(opt: argparse.Namespace):
:param opt: program options
:type opt: argparse.Namespace
"""
client = GaleneClient(opt.group, opt.output, opt.username, opt.password)
client = GaleneClient(opt.input, opt.output, opt.group, opt.username, opt.password)

# Connect and run main even loop
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(client.connect())
event_loop.run_until_complete(client.loop(event_loop, opt.input))
event_loop.run_until_complete(client.loop(event_loop))


def main():
Expand All @@ -43,10 +43,10 @@ def main():
parser.add_argument(
"-i",
"--input",
default="rtmp://localhost:1935/live/test",
required=True,
help=(
'URI to use as GStreamer "uridecodebin" module input, '
'default to "rtmp://localhost:1935/live/test"'
'e.g. "rtmp://localhost:1935/live/test"'
),
)
parser.add_argument(
Expand Down
39 changes: 24 additions & 15 deletions galene_stream/galene.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@
log = logging.getLogger(__name__)


class GaleneClient(WebRTCClient):
class GaleneClient:
"""Galène protocol implementation."""

def __init__(
self, group: str, server: str, username: str, password=None, identifier=None
self,
input_uri: str,
server: str,
group: str,
username: str,
password=None,
identifier=None,
):
"""Create GaleneClient
:param group: group to join
:type group: str
:param input_uri: URI for GStreamer uridecodebin
:type input_uri: str
:param server: websocket url to connect to
:type server: str
:param group: group to join
:type group: str
:param username: group user name
:type username: str
:param password: group user password if required
Expand All @@ -41,13 +49,16 @@ def __init__(
# Create random client id
identifier = secrets.token_bytes(16).hex()

self.group = group
self.server = server
self.group = group
self.username = username
self.password = password
self.client_id = identifier
self.conn = None
self.ice_servers = None
self.webrtc = WebRTCClient(
input_uri, self.send_sdp_offer, self.send_ice_candidate
)

async def send(self, message: dict):
"""Send message to remote.
Expand Down Expand Up @@ -115,29 +126,27 @@ async def connect(self):
"password": self.password,
}
await self.send(msg)
response = None
while response is None or response["type"] != "joined":
response = {"type": "none"}
while response["type"] != "joined":
# The server will send 'user' messages that we ignore
response = await self.conn.recv()
response = json.loads(response)
if response["kind"] != "join":
raise RuntimeError("failed to join room")
self.ice_servers = response.get("rtcConfiguration").get("iceServers", [])

async def loop(self, event_loop, input_uri: str) -> int:
async def loop(self, event_loop) -> int:
"""Client loop
:param event_loop: asyncio event loop
:type event_loop: EventLoop
:param input_uri: URI for GStreamer uridecodebin
:type input_uri: str
:raises RuntimeError: if client is not connected
:return: exit code
:rtype: int
"""
if self.conn is None:
raise RuntimeError("client not connected")
self.start_pipeline(event_loop, self.ice_servers, input_uri)
self.webrtc.start_pipeline(event_loop, self.ice_servers)
log.info("Waiting for incoming stream...")

async for message in self.conn:
Expand All @@ -153,16 +162,16 @@ async def loop(self, event_loop, input_uri: str) -> int:
elif message["type"] == "answer":
# Server is sending a SDP offer
sdp = message.get("sdp")
self.set_remote_sdp(sdp)
self.webrtc.set_remote_sdp(sdp)
elif message["type"] == "ice":
# Server is sending trickle ICE candidates
log.debug("Receiving new ICE candidate from remote")
mline_index = message.get("candidate").get("sdpMLineIndex")
candidate = message.get("candidate").get("candidate")
self.add_ice_candidate(mline_index, candidate)
self.webrtc.add_ice_candidate(mline_index, candidate)
elif message["type"] == "renegotiate":
# Server is asking to renegotiate WebRTC session
self.on_negotiation_needed(self.webrtc)
self.webrtc.on_negotiation_needed(self.webrtc)
elif message["type"] == "usermessage":
value = message.get("value")
if message["kind"] == "error":
Expand All @@ -180,4 +189,4 @@ async def loop(self, event_loop, input_uri: str) -> int:
# Oh no! We receive something not implemented
log.warn(f"Not implemented {message}")

self.close_pipeline()
self.webrtc.close_pipeline()
28 changes: 17 additions & 11 deletions galene_stream/webrtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,26 @@ class WebRTCClient:
Based on <https://gitlab.freedesktop.org/gstreamer/gst-examples/>.
"""

def __init__(self):
"""Init WebRTCClient."""
def __init__(self, input_uri: str, sdp_offer_callback, ice_candidate_callback):
"""Init WebRTCClient.
:param input_uri: URI for GStreamer uridecodebin
:type input_uri: str
:param sdp_offer_callback: function to send SDP offer
:type sdp_offer_callback: function
:param ice_candidate_callback: function to send ICE candidate
:type ice_candidate_callback: function
"""
self.event_loop = None
self.pipe = None
self.webrtc = None
self.input_uri = ""
self.sdp_offer_callback = sdp_offer_callback
self.ice_candidate_callback = ice_candidate_callback

# webrtcbin latency parameter was added in gstreamer 1.18
self.pipeline_desc = (
"webrtcbin name=send bundle-policy=max-bundle latency=500 "
"uridecodebin uri={input_uri} name=bin "
f"uridecodebin uri={input_uri} name=bin "
"bin. ! vp8enc deadline=1 keyframe-max-dist=5 target-bitrate=5000000 ! rtpvp8pay pt=97 ! send. "
"bin. ! audioconvert ! audioresample ! opusenc ! rtpopuspay pt=96 ! send."
)
Expand Down Expand Up @@ -87,7 +96,7 @@ def on_offer_created(self, promise, _, __):
# Send local SDP offer to remote
offer = offer.sdp.as_text()
future = asyncio.run_coroutine_threadsafe(
self.send_sdp_offer(offer), self.event_loop
self.sdp_offer_callback(offer), self.event_loop
)
future.result() # wait

Expand Down Expand Up @@ -117,7 +126,7 @@ def on_ice_candidate(self, _, mline_index, candidate: str):
"""
candidate = {"candidate": candidate, "sdpMLineIndex": mline_index}
future = asyncio.run_coroutine_threadsafe(
self.send_ice_candidate(candidate), self.event_loop
self.ice_candidate_callback(candidate), self.event_loop
)
future.result() # wait

Expand Down Expand Up @@ -147,20 +156,17 @@ def add_ice_candidate(self, mline_index: int, candidate: str):
"""
self.webrtc.emit("add-ice-candidate", mline_index, candidate)

def start_pipeline(self, event_loop, ice_servers, input_uri):
def start_pipeline(self, event_loop, ice_servers):
"""Start gstreamer pipeline and connect WebRTC events.
:param event_loop: asyncio event loop
:type event_loop: EventLoop
:param ice_servers: list of ICE TURN servers
:type ice_servers: list of dicts
:param input_uri: URI for GStreamer uridecodebin
:type input_uri: str
"""
log.info("Starting pipeline")
self.event_loop = event_loop
self.input_uri = input_uri
self.pipe = Gst.parse_launch(self.pipeline_desc.format(input_uri=input_uri))
self.pipe = Gst.parse_launch(self.pipeline_desc)
self.webrtc = self.pipe.get_by_name("send")
self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed)
self.webrtc.connect("on-ice-candidate", self.on_ice_candidate)
Expand Down

0 comments on commit 39287dd

Please sign in to comment.