Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to use ArduPilot firmware manifest to download SITL binaries #145

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 267 additions & 22 deletions dronekit_sitl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import atexit
import select
import psutil
import platform
import tempfile
import gzip

from subprocess import Popen, PIPE
import threading
from threading import Thread
from six.moves.queue import Queue, Empty


sitl_host = 'http://dronekit-assets.s3.amazonaws.com/sitl'
ardupilot_sitl_host = 'http://firmware.ardupilot.org'
sitl_target = os.path.normpath(os.path.expanduser('~/.dronekit/sitl'))

def kill(proc_pid):
Expand Down Expand Up @@ -78,19 +82,270 @@ def readline(self, timeout = None):
class UnexpectedEndOfStream(Exception):
pass

def use_new_sitl_binaries():
(system, node, release, version, machine, processor) = platform.uname()
if system == "Linux" and machine == "x86_64":
return True
return False

def version_list():
if use_new_sitl_binaries():
return version_list_new()
# FIXME: add arm here
return version_list_old()

def version_list_old():
sitl_list = '{}/versions.json'.format(sitl_host)

req = Request(sitl_list, headers={'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'})

raw = urlopen(req).read()

if six.PY3:
raw = raw.decode('utf-8')

versions = json.loads(raw)
# print("versions: %s" % versions)
return versions

def manifest_path():
return os.path.join(sitl_target, "manifest.json.gz")

def debug(message):
print(message, file=sys.stderr)

def version_list_new_manifest(freshen=False):
mpath = manifest_path()
if not freshen:
if os.path.exists(mpath):
if time.time() - os.path.getmtime(mpath) < 86400:
with gzip.open(mpath) as fd:
debug("returning cached manifest")
data = fd.read()
j = json.loads(data)
return j

sitl_list = '{}/manifest.json.gz'.format(ardupilot_sitl_host)
debug("Downloading (%s)" % (sitl_list,))

req = Request(sitl_list, headers={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
})
raw = urlopen(req).read()

os.makedirs(sitl_target, exist_ok=True)

mpath_tmp = "%s.tmp" % (mpath,)
with open(mpath_tmp, 'wb') as fd:
fd.write(raw)

ret = json.loads(gzip.open(mpath_tmp).read()) # ensure parsable before storing

os.rename(mpath_tmp, mpath)

return ret

def version_list_new(freshen=False):
json = version_list_new_manifest(freshen=freshen)
# form up old-style json from manifest file
(system, node, release, version, machine, processor) = platform.uname()
if system == "Linux" and machine == "x86_64":
required_platform = "SITL_x86_64_linux_gnu"
else:
raise ValueError("Failed to determine required platform")

ret = {}
for f in json["firmware"]:
if f["platform"] != required_platform:
continue
# print("f = %s" % str(f))
v = f["vehicletype"].lower()
if f["mav-type"] == "HELICOPTER":
v = "helicopter"
if v not in ret:
ret[v] = {}
ver = "%s-%s" % (f["mav-firmware-version"], f["mav-firmware-version-type"])
if ver in ret[v]:
raise ValueError("Already have a version %s for %s: (%s), this is (%s)" % (ver, v, ret[v][ver], f))
ret[v][ver] = f

return ret


def download(system, version, target, verbose=False):
if use_new_sitl_binaries():
return download_new(system, version, target, verbose=verbose)
download_old(system, version, target, verbose=verbose)

def download_new(system, version, target, verbose=False):
debug("new download (system=%s) (version=%s) target=(%s)" %
(system, version, target))
entries = version_list()[system]
if target == "linux":
platform = "SITL_x86_64_linux_gnu"
else:
raise Exception("Unknown target (%s)" % target)
version_type = None
major = None
minor = None
patch = None
if version == "stable":
version_type = "OFFICIAL"
else:
# must be dotted thingy
try:
(major, minor, patch) = version.split(".")
except ValueError:
try:
(major, minor) = version.split(".")
except ValueError:
major = version

current_favourite = None
for entry_key in entries:
entry = entries[entry_key]
if entry["platform"] != platform:
continue
if version_type is not None:
if entry["mav-firmware-version-type"] != version_type:
continue
else:
if major is not None:
if entry["mav-firmware-version-major"] != major:
continue
if minor is not None:
if entry["mav-firmware-version-minor"] != minor:
continue
if current_favourite is not None:
if entry["mav-firmware-version-minor"] < current_favourite["mav-firmware-version-minor"]:
continue
if patch is not None:
if entry["mav-firmware-version-patch"] != patch:
continue
if current_favourite is not None:
if entry["mav-firmware-version-patch"] < current_favourite["mav-firmware-version-patch"]:
continue
# print("Entry: %s" % str(entry))
current_favourite = entry

debug("Decided on entry (%s)" % (str(current_favourite),))
target_dir = os.path.join(sitl_target, "%s-%s" % (system, version))
try:
os.makedirs(target_dir, exist_ok=True)
except OSError as e:
print("Got: %s" % str(e))

target_filepath = os.path.join(target_dir, "apm")
download_url_to_filepath(current_favourite["url"], target_filepath)
os.chmod(target_filepath, 0o755)

# fake up some parameters to make SITL generally happy; this is a
# slightly modified default_params/copter.parm
parameters = """
EK2_ENABLE 1
FRAME_TYPE 0
FS_THR_ENABLE 1
BATT_MONITOR 4
COMPASS_LEARN 0
COMPASS_OFS_X 5
COMPASS_OFS_Y 13
COMPASS_OFS_Z -18
COMPASS_OFS2_X 5
COMPASS_OFS2_Y 13
COMPASS_OFS2_Z -18
COMPASS_OFS3_X 5
COMPASS_OFS3_Y 13
COMPASS_OFS3_Z -18
FENCE_RADIUS 150
FRAME_CLASS 1
RC1_MAX 2000.000000
RC1_MIN 1000.000000
RC1_TRIM 1500.000000
RC2_MAX 2000.000000
RC2_MIN 1000.000000
RC2_TRIM 1500.000000
RC3_MAX 2000.000000
RC3_MIN 1000.000000
RC3_TRIM 1500.000000
RC4_MAX 2000.000000
RC4_MIN 1000.000000
RC4_TRIM 1500.000000
RC5_MAX 2000.000000
RC5_MIN 1000.000000
RC5_TRIM 1500.000000
RC6_MAX 2000.000000
RC6_MIN 1000.000000
RC6_TRIM 1500.000000
RC7_MAX 2000.000000
RC7_MIN 1000.000000
RC7_OPTION 7
RC7_TRIM 1500.000000
RC8_MAX 2000.000000
RC8_MIN 1000.000000
RC8_TRIM 1500.000000
FLTMODE1 7
FLTMODE2 9
FLTMODE3 6
FLTMODE4 3
FLTMODE5 5
FLTMODE6 0
FS_GCS_ENABLE 0
SUPER_SIMPLE 0
SIM_GPS_DELAY 1
SIM_ACC_RND 0
SIM_GYR_RND 0
SIM_WIND_SPD 0
SIM_WIND_TURB 0
SIM_BARO_RND 0
SIM_MAG_RND 0
SIM_GPS_GLITCH_X 0
SIM_GPS_GLITCH_Y 0
SIM_GPS_GLITCH_Z 0
# we need small INS_ACC offsets so INS is recognised as being calibrated
INS_ACCOFFS_X 0.001
INS_ACCOFFS_Y 0.001
INS_ACCOFFS_Z 0.001
INS_ACCSCAL_X 1.001
INS_ACCSCAL_Y 1.001
INS_ACCSCAL_Z 1.001
INS_ACC2OFFS_X 0.001
INS_ACC2OFFS_Y 0.001
INS_ACC2OFFS_Z 0.001
INS_ACC2SCAL_X 1.001
INS_ACC2SCAL_Y 1.001
INS_ACC2SCAL_Z 1.001
INS_ACC3OFFS_X 0.000
INS_ACC3OFFS_Y 0.000
INS_ACC3OFFS_Z 0.000
INS_ACC3SCAL_X 1.000
INS_ACC3SCAL_Y 1.000
INS_ACC3SCAL_Z 1.000
MOT_THST_EXPO 0.5
MOT_THST_HOVER 0.36
"""
params_path = os.path.join(target_dir, "default_params.txt")
debug("Writing out some default parameters to (%s)" % params_path)
with open(params_path, 'w') as fd:
fd.write(parameters)

def download_url_to_filepath(url, filepath, verbose=False):
def check_complete(count, block_size, total_size):
if verbose and total_size != -1 and (count * block_size) >= total_size:
print('Download Complete.')

try:
testfile = URLopener()
testfile.retrieve(url, filepath, check_complete)
except HTTPError as e:
if e.code == 404:
print('File Not Found: %s' % url)
sys.exit(1)
except IOError as e:
if e.args[1] == 404:
print('File Not Found: %s' % url)
sys.exit(1)

def download_old(system, version, target, verbose=False):
sitl_file = "{}/{}/sitl-{}-{}-{}.tar.gz".format(sitl_host, system, target, system, version)

# Delete old directories from legacy SITL.
Expand All @@ -106,27 +361,13 @@ def download(system, version, target, verbose=False):
if verbose:
print("Downloading SITL from %s" % sitl_file)

if not os.path.isdir(sitl_target):
os.makedirs(sitl_target)
os.makedirs(sitl_target, exist_ok=True)

def check_complete(count, block_size, total_size):
if verbose and total_size != -1 and (count * block_size) >= total_size:
print('Download Complete.')

try:
testfile = URLopener()
testfile.retrieve(sitl_file, sitl_target + '/sitl.tar.gz', check_complete)
except HTTPError as e:
if e.code == 404:
print('File Not Found: %s' % sitl_file)
sys.exit(1)
except IOError as e:
if e.args[1] == 404:
print('File Not Found: %s' % sitl_file)
sys.exit(1)
sitl_filepath = sitl_target + '/sitl.tar.gz'
download_url_to_filepath(sitl_file, sitl_filepath, verbose=verbose)

# TODO: cleanup sitl.tar.gz
tar = tarfile.open(sitl_target + '/sitl.tar.gz')
tar = tarfile.open(sitl_filepath)
tar.extractall(path=sitl_target + '/' + system + '-' + version)
tar.close()

Expand Down Expand Up @@ -193,7 +434,7 @@ def download(self, system, version, target=None, verbose=False):
if target == None:
target = detect_target()

if version == 'stable':
if version == 'stable' and not use_new_sitl_binaries():
try:
version = version_list()[system]['stable']
except:
Expand Down Expand Up @@ -320,6 +561,10 @@ def cleanup_sim():
if caps.has_defaults_path:
if self.defaults_filepath is not None:
args.extend(["--defaults", self.defaults_filepath])
else:
defaults_filepath = os.path.join(os.path.dirname(self.path), 'default_params.txt')
if os.path.exists(defaults_filepath):
args.extend(["--defaults", defaults_filepath])
if not use_saved_data:
args.append("-w")
else:
Expand Down Expand Up @@ -451,7 +696,7 @@ def start_default(lat=None, lon=None):
args["defaults_filepath"] = defaults
sitl = SITL(**args)
if do_download:
sitl.download('copter', '3.3', verbose=True)
sitl.download('copter', 'stable', verbose=True)
if ((lat is not None and lon is None) or
(lat is None and lon is not None)):
print("Supply both lat and lon, or neither")
Expand Down Expand Up @@ -481,8 +726,8 @@ def reset():
pass
print('SITL directory cleared.')

def main(args=None):
if args == None:
def main():
if True:
args = sys.argv[1:]

if sys.platform == 'win32':
Expand Down