Skip to content

Commit

Permalink
Move horus UDP packet generator to this repo
Browse files Browse the repository at this point in the history
  • Loading branch information
darksidelemm committed Jul 11, 2020
1 parent 6113db1 commit c7c3810
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
2 changes: 1 addition & 1 deletion horusdemodlib/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.9"
__version__ = "0.1.10"
91 changes: 91 additions & 0 deletions horusdemodlib/horusudp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python
#
# Horus Telemetry GUI - Horus UDP
#
# Mark Jessop <[email protected]>
#
import datetime
import json
import logging
import socket


def send_payload_summary(telemetry, port=55672, comment="HorusDemodLib"):
""" Send a payload summary message into the network via UDP broadcast.
Args:
telemetry (dict): Telemetry dictionary to send.
port (int): UDP port to send to.
"""

try:
# Do a few checks before sending.
if telemetry["latitude"] == 0.0 and telemetry["longitude"] == 0.0:
logging.error("Horus UDP - Zero Latitude/Longitude, not sending.")
return

packet = {
"type": "PAYLOAD_SUMMARY",
"callsign": telemetry["callsign"],
"latitude": telemetry["latitude"],
"longitude": telemetry["longitude"],
"altitude": telemetry["altitude"],
"speed": -1,
"heading": -1,
"time": telemetry["time"],
"comment": comment,
"temp": -1,
"sats": -1,
"batt_voltage": -1,
}

if 'snr' in telemetry:
packet['snr'] = telemetry['snr']

# Set up our UDP socket
_s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
_s.settimeout(1)
# Set up socket for broadcast, and allow re-use of the address
_s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Under OSX we also need to set SO_REUSEPORT to 1
try:
_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except:
pass

try:
_s.sendto(json.dumps(packet).encode("ascii"), ("<broadcast>", port))
# Catch any socket errors, that may occur when attempting to send to a broadcast address
# when there is no network connected. In this case, re-try and send to localhost instead.
except socket.error as e:
logging.debug(
"Horus UDP - Send to broadcast address failed, sending to localhost instead."
)
_s.sendto(json.dumps(packet).encode("ascii"), ("127.0.0.1", port))

_s.close()

except Exception as e:
logging.error("Horus UDP - Error sending Payload Summary: %s" % str(e))


if __name__ == "__main__":
# Test script for the above functions
from horusdemodlib.decoder import parse_ukhas_string
from horusdemodlib.checksums import ukhas_crc
# Setup Logging
logging.basicConfig(
format="%(asctime)s %(levelname)s: %(message)s", level=logging.INFO
)

sentence = "$$TESTING,1,01:02:03,-34.0,138.0,1000"
crc = ukhas_crc(sentence[2:].encode("ascii"))
sentence = sentence + "*" + crc
print("Sentence: " + sentence)

_decoded = parse_ukhas_string(sentence)
print(_decoded)

send_payload_summary(_decoded)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "horusdemodlib"
version = "0.1.9"
version = "0.1.10"
description = "Project Horus HAB Telemetry Demodulators"
authors = ["Mark Jessop"]
license = "LGPL-2.1-or-later"
Expand Down

0 comments on commit c7c3810

Please sign in to comment.