From b37b7c009710a24c2ca04de82bb42544d26e2271 Mon Sep 17 00:00:00 2001 From: Alexandre 'Kidev' Poumaroux <1204936+Kidev@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:45:56 +0100 Subject: [PATCH] Improve safety around downloaded binary execution --- aqt/installer.py | 192 ++++++++++++++++++++++++++++------------------- 1 file changed, 115 insertions(+), 77 deletions(-) diff --git a/aqt/installer.py b/aqt/installer.py index caaa9b93..4fed3d1d 100644 --- a/aqt/installer.py +++ b/aqt/installer.py @@ -1451,6 +1451,21 @@ def download_bin(_base_url): class CommercialInstaller: + + ALLOWED_INSTALLERS = frozenset( + {"qt-unified-windows-x64-online.exe", "qt-unified-macOS-x64-online.dmg", "qt-unified-linux-x64-online.run"} + ) + + ALLOWED_AUTO_ANSWER_OPTIONS = { + "OperationDoesNotExistError": frozenset({"Abort", "Ignore"}), + "OverwriteTargetDirectory": frozenset({"Yes", "No"}), + "stopProcessesForUpdates": frozenset({"Retry", "Ignore", "Cancel"}), + "installationErrorWithCancel": frozenset({"Retry", "Ignore", "Cancel"}), + "installationErrorWithIgnore": frozenset({"Retry", "Ignore"}), + "AssociateCommonFiletypes": frozenset({"Yes", "No"}), + "telemetry": frozenset({"Yes", "No"}), + } + def __init__( self, target: str, @@ -1500,18 +1515,26 @@ def __init__( self.installer_filename = self._get_installer_filename() self.qt_account = self._get_qt_account_path() - @staticmethod - def _validate_installer_path(installer_path: Path) -> None: + def _validate_installer_path(self, installer_path: Path) -> None: """Validate the installer path is safe and exists""" - ALLOWED_FILENAMES = { - "qt-unified-windows-x64-online.exe", - "qt-unified-macOS-x64-online.dmg", - "qt-unified-linux-x64-online.run", - } - if not installer_path.exists(): - raise ValueError(f"Installer path does not exist: {installer_path}") - if installer_path.name not in ALLOWED_FILENAMES: - raise ValueError(f"Invalid installer filename: {installer_path.name}") + try: + # Resolve to absolute path and check for symlink attacks + real_path = installer_path.resolve(strict=True) + + if real_path.name not in self.ALLOWED_INSTALLERS: + raise ValueError(f"Invalid installer filename: {real_path.name}") + + # Additional file checks + if not real_path.is_file(): + raise ValueError("Path is not a regular file") + + # On Unix systems, check permissions + if hasattr(os, "access"): + if not os.access(real_path, os.X_OK): + raise ValueError("Installer file is not executable") + + except (RuntimeError, OSError) as e: + raise ValueError(f"Invalid installer path: {e}") @staticmethod def _validate_dir_path(path: Optional[str]) -> None: @@ -1589,115 +1612,130 @@ def _get_package_name(self) -> str: return f"qt.qt{self.version.major}.{qt_version}.{self.arch}" def _get_install_command(self, installer_path: Path) -> list[str]: - """Build installation command with security validations""" - # Validate installer path + """Build installation command with enhanced security validations""" + # Validate installer path first self._validate_installer_path(installer_path) - # Start with validated installer path - cmd = [str(installer_path.absolute().resolve())] + # Start with empty command list + cmd: list[str] = [] + + # Resolve and validate installer path + resolved_installer = str(installer_path.resolve(strict=True)) + if not resolved_installer: + raise ValueError("Empty installer path") + cmd.append(resolved_installer) - # Validate and add authentication if provided + # Validate credentials if provided if self.username and self.password: if not isinstance(self.username, str) or not isinstance(self.password, str): - raise ValueError("Username and password must be strings") + raise ValueError("Invalid credential types") + if not self.username.strip() or not self.password.strip(): + raise ValueError("Empty credentials") cmd.extend(["--email", self.username, "--pw", self.password]) - # Validate and add installation directory if provided + # Validate installation directory if self.output_dir: - self._validate_dir_path(self.output_dir) - cmd.extend(["--root", str(Path(self.output_dir).absolute().resolve())]) - - # Validate all auto-answer options - self._validate_auto_answer_option( - "OperationDoesNotExistError", self.operation_does_not_exist_error, {"Abort", "Ignore"} - ) - self._validate_auto_answer_option("OverwriteTargetDirectory", self.overwrite_target_dir, {"Yes", "No"}) - self._validate_auto_answer_option( - "stopProcessesForUpdates", self.stop_processes_for_updates, {"Retry", "Ignore", "Cancel"} - ) - self._validate_auto_answer_option( - "installationErrorWithCancel", self.installation_error_with_cancel, {"Retry", "Ignore", "Cancel"} - ) - self._validate_auto_answer_option( - "installationErrorWithIgnore", self.installation_error_with_ignore, {"Retry", "Ignore"} - ) - self._validate_auto_answer_option("AssociateCommonFiletypes", self.associate_common_filetypes, {"Yes", "No"}) - self._validate_auto_answer_option("telemetry", self.telemetry, {"Yes", "No"}) - - # Build auto-answer string with validated options - auto_answer_options = [ - f"OperationDoesNotExistError={self.operation_does_not_exist_error}", - f"OverwriteTargetDirectory={self.overwrite_target_dir}", - f"stopProcessesForUpdates={self.stop_processes_for_updates}", - f"installationErrorWithCancel={self.installation_error_with_cancel}", - f"installationErrorWithIgnore={self.installation_error_with_ignore}", - f"AssociateCommonFiletypes={self.associate_common_filetypes}", - f"telemetry-question={self.telemetry}", + resolved_output = str(Path(self.output_dir).resolve(strict=True)) + if not resolved_output: + raise ValueError("Invalid output directory") + cmd.extend(["--root", resolved_output]) + + # Build auto-answer options with enhanced validation + auto_answers = [] + for option, value in [ + ("OperationDoesNotExistError", self.operation_does_not_exist_error), + ("OverwriteTargetDirectory", self.overwrite_target_dir), + ("stopProcessesForUpdates", self.stop_processes_for_updates), + ("installationErrorWithCancel", self.installation_error_with_cancel), + ("installationErrorWithIgnore", self.installation_error_with_ignore), + ("AssociateCommonFiletypes", self.associate_common_filetypes), + ("telemetry-question", self.telemetry), + ]: + # Check against allowed values + allowed_values = self.ALLOWED_AUTO_ANSWER_OPTIONS.get(option) + if not allowed_values or value not in allowed_values: + raise ValueError(f"Invalid value '{value}' for option {option}") + auto_answers.append(f"{option}={value}") + + # Add validated auto-answer string + auto_answer_string = ",".join(auto_answers) + if not auto_answer_string: + raise ValueError("Empty auto-answer string") + + # Add standard arguments and package + standard_args = [ + "--accept-licenses", + "--accept-obligations", + "--confirm-command", + "--auto-answer", + auto_answer_string, + "install", + self._get_package_name(), ] - auto_answer_string = ",".join(filter(None, auto_answer_options)) - - # Add standard arguments - cmd.extend( - [ - "--unattended", - "--accept-licenses", - "--accept-obligations", - "--confirm-command", - "--auto-answer", - auto_answer_string, - "install", - self._get_package_name(), - ] - ) + cmd.extend(standard_args) + + # Final validation of complete command + if not all(isinstance(arg, str) for arg in cmd): + raise ValueError("Non-string arguments detected") + if not all(arg.strip() for arg in cmd): + raise ValueError("Empty arguments detected") return cmd def install(self) -> None: - """Run commercial installation""" - # Verify auth + """Run commercial installation with enhanced security""" if not self.qt_account.exists() and not (self.username and self.password): raise CliInputError( "No Qt account credentials found. Either provide --user and --password " f"or ensure {self.qt_account} exists" ) - # Create temp dir for installer - with tempfile.TemporaryDirectory() as temp_dir: - installer_path = Path(temp_dir) / self.installer_filename + with tempfile.TemporaryDirectory(prefix="qt_install_") as temp_dir: + # Create temp dir with restricted permissions + temp_path = Path(temp_dir) + if hasattr(os, "chmod"): + os.chmod(temp_dir, 0o500) # Read and execute for owner only + + installer_path = temp_path / self.installer_filename - # Download installer self.logger.info(f"Downloading Qt online installer to {installer_path}") self._download_installer(installer_path) - # Run installation with security validations self.logger.info("Starting Qt installation") try: - # Get validated command cmd = self._get_install_command(installer_path) - # Additional security checks - if not all(isinstance(arg, str) for arg in cmd): - raise ValueError("Command contains non-string arguments") - # Log sanitized command (exclude password) - safe_cmd = cmd.copy() + safe_cmd = list(cmd) if "--pw" in safe_cmd: pw_index = safe_cmd.index("--pw") if len(safe_cmd) > pw_index + 1: safe_cmd[pw_index + 1] = "********" self.logger.info(f"Running: {' '.join(safe_cmd)}") - # Run with explicit shell=False for security - subprocess.check_call(cmd, shell=False) + # Execute with explicit security settings + subprocess.check_call( + cmd, + shell=False, + env=os.environ.copy(), # Use copy of environment + cwd=temp_dir, # Run from temp directory + timeout=3600, # Set reasonable timeout + ) except subprocess.CalledProcessError as e: error_msg = f"Qt installation failed with code {e.returncode}" if e.stderr: error_msg += f": {e.stderr.decode('utf-8', errors='replace')}" raise CliInputError(error_msg) + except subprocess.TimeoutExpired: + raise CliInputError("Installation timed out") except (ValueError, OSError) as e: raise CliInputError(f"Installation preparation failed: {str(e)}") + finally: + # Ensure cleanup + if installer_path.exists(): + installer_path.unlink() self.logger.info("Qt installation completed successfully")