From bc63a9b835ae91d14a619e187373117ea8f1c8f4 Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Tue, 15 Aug 2023 20:06:52 -0400 Subject: [PATCH] Add migrate endpoint to move artifacts to another storage backend fixes: #3358 --- CHANGES/3358.feature | 1 + pulpcore/app/serializers/__init__.py | 2 +- pulpcore/app/serializers/domain.py | 112 ++++++++++++------ pulpcore/app/tasks/__init__.py | 2 + pulpcore/app/tasks/migrate.py | 60 ++++++++++ pulpcore/app/viewsets/domain.py | 48 +++++++- pulpcore/pytest_plugin.py | 42 +++++-- .../api/using_plugin/test_migrate.py | 111 +++++++++++++++++ 8 files changed, 328 insertions(+), 50 deletions(-) create mode 100644 CHANGES/3358.feature create mode 100644 pulpcore/app/tasks/migrate.py create mode 100644 pulpcore/tests/functional/api/using_plugin/test_migrate.py diff --git a/CHANGES/3358.feature b/CHANGES/3358.feature new file mode 100644 index 0000000000..e584ecd5f0 --- /dev/null +++ b/CHANGES/3358.feature @@ -0,0 +1 @@ +Added new `/migrate/` endpoint to Domains that allows for migrating artifacts from one storage backend to another. diff --git a/pulpcore/app/serializers/__init__.py b/pulpcore/app/serializers/__init__.py index 0d8be05b3b..ccdc5327bc 100644 --- a/pulpcore/app/serializers/__init__.py +++ b/pulpcore/app/serializers/__init__.py @@ -51,7 +51,7 @@ SigningServiceSerializer, SingleArtifactContentSerializer, ) -from .domain import DomainSerializer +from .domain import DomainSerializer, DomainBackendMigratorSerializer from .exporter import ( ExporterSerializer, ExportSerializer, diff --git a/pulpcore/app/serializers/domain.py b/pulpcore/app/serializers/domain.py index 0ccd7f9e95..fb0472f03c 100644 --- a/pulpcore/app/serializers/domain.py +++ b/pulpcore/app/serializers/domain.py @@ -1,4 +1,5 @@ from gettext import gettext as _ +import json from django.conf import settings from django.core.files.storage import import_string @@ -9,7 +10,7 @@ from rest_framework import serializers from rest_framework.validators import UniqueValidator -from pulpcore.app import models +from pulpcore.app.models import Domain from pulpcore.app.serializers import IdentityField, ModelSerializer, HiddenFieldsMixin @@ -104,12 +105,12 @@ class SFTPSettingsSerializer(BaseSettingsClass): SETTING_MAPPING = { "sftp_storage_host": "host", "sftp_storage_params": "params", - # 'sftp_storage_interactive': 'interactive', # Can not allow users to set to True + "sftp_storage_interactive": "interactive", "sftp_storage_file_mode": "file_mode", "sftp_storage_dir_mode": "dir_mode", "sftp_storage_uid": "uid", "sftp_storage_gid": "gid", - # 'sftp_known_host_file': 'known_host_file', # This is dangerous to allow to be set + "sftp_known_host_file": "known_host_file", "sftp_storage_root": "root_path", "media_url": "base_url", "sftp_base_url": "base_url", @@ -123,6 +124,8 @@ class SFTPSettingsSerializer(BaseSettingsClass): uid = serializers.CharField(allow_null=True, default=None) gid = serializers.CharField(allow_null=True, default=None) base_url = serializers.CharField(allow_null=True, default=None) + interactive = serializers.HiddenField(default=False) + known_host_file = serializers.HiddenField(default=None) class TransferConfigSerializer(serializers.Serializer): @@ -187,7 +190,6 @@ class AmazonS3SettingsSerializer(BaseSettingsClass): access_key = serializers.CharField(required=True, write_only=True) secret_key = serializers.CharField(allow_null=True, default=None, write_only=True) security_token = serializers.CharField(allow_null=True, default=None, write_only=True) - # Too dangerous to use shared cred file, ensure is always False session_profile = serializers.HiddenField(default=False) file_overwrite = serializers.BooleanField(default=True) object_parameters = serializers.DictField(default={}) @@ -320,7 +322,7 @@ class GoogleSettingsSerializer(BaseSettingsClass): child=serializers.CharField(), default=DEFAULT_CONTENT_TYPES ) file_overwrite = serializers.BooleanField(default=True) # This should always be True - object_parameters = serializers.DictField(default=dict()) + object_parameters = serializers.DictField(default={}) max_memory_size = serializers.IntegerField(default=0) blob_chunk_size = serializers.IntegerField(allow_null=True, default=None) @@ -349,9 +351,22 @@ def to_internal_value(self, data): """Appropriately convert the incoming data based on the Domain's storage class.""" # Handle Creating & Updating storage_settings = self.root.initial_data.get("storage_settings", {}) + if not isinstance(storage_settings, dict): + if isinstance(storage_settings, str): + try: + storage_settings = json.loads(storage_settings) + except json.JSONDecodeError: + raise serializers.ValidationError("Improper JSON string passed in") + else: + raise serializers.ValidationError("Storage settings should be a JSON object.") + if self.root.instance: - storage_class = self.root.instance.storage_class - storage_settings = {**self.root.instance.storage_settings, **storage_settings} + # Use passed in values, if not present fallback onto current values of instance + storage_class = self.root.initial_data.get( + "storage_class", self.root.instance.storage_class + ) + if storage_class == self.root.instance.storage_class: + storage_settings = {**self.root.instance.storage_settings, **storage_settings} else: storage_class = self.root.initial_data["storage_class"] @@ -365,21 +380,52 @@ def to_internal_value(self, data): def create_storage(self): """Instantiate a storage class based on the Domain's storage class.""" - instance = self.root.instance - serializer_class = self.STORAGE_MAPPING[instance.storage_class] - serializer = serializer_class(data=instance.storage_settings) + if self.root.instance: + storage_class = self.root.instance.storage_class + storage_settings = self.root.instance.storage_settings + else: + storage_class = self.root.initial_data["storage_class"] + storage_settings = self.root.initial_data["storage_settings"] + serializer_class = self.STORAGE_MAPPING[storage_class] + serializer = serializer_class(data=storage_settings) serializer.is_valid(raise_exception=True) return serializer.create(serializer.validated_data) -class DomainSerializer(ModelSerializer): +class BackendSettingsValidator: + """Mixin to handle validating `storage_class` and `storage_settings`.""" + + @staticmethod + def _validate_storage_backend(storage_class, storage_settings): + """Ensure that the backend can be used.""" + try: + backend = import_string(storage_class) + except (ImportError, ImproperlyConfigured): + raise serializers.ValidationError( + detail={"storage_class": _("Backend is not installed on Pulp.")} + ) + + try: + backend(**storage_settings) + except ImproperlyConfigured as e: + raise serializers.ValidationError( + detail={ + "storage_settings": _("Backend settings contain incorrect values: {}".format(e)) + } + ) + + def create_storage(self): + return self.fields["storage_settings"].create_storage() + + +class DomainSerializer(BackendSettingsValidator, ModelSerializer): """Serializer for Domain.""" pulp_href = IdentityField(view_name="domains-detail") name = serializers.SlugField( max_length=50, help_text=_("A name for this domain."), - validators=[UniqueValidator(queryset=models.Domain.objects.all())], + validators=[UniqueValidator(queryset=Domain.objects.all())], ) description = serializers.CharField( help_text=_("An optional description."), required=False, allow_null=True @@ -406,24 +452,6 @@ def validate_name(self, value): raise serializers.ValidationError(_("Name can not be 'api' or 'content'.")) return value - def _validate_storage_backend(self, storage_class, storage_settings): - """Ensure that the backend can be used.""" - try: - backend = import_string(storage_class) - except (ImportError, ImproperlyConfigured): - raise serializers.ValidationError( - detail={"storage_class": _("Backend is not installed on Pulp.")} - ) - - try: - backend(**storage_settings) - except ImproperlyConfigured as e: - raise serializers.ValidationError( - detail={ - "storage_settings": _("Backend settings contain incorrect values: {}".format(e)) - } - ) - def validate(self, data): """Ensure that Domain settings are valid.""" # Validate for update gets called before ViewSet default check @@ -448,11 +476,8 @@ def validate(self, data): ) return data - def create_storage(self): - return self.fields["storage_settings"].create_storage() - class Meta: - model = models.Domain + model = Domain fields = ModelSerializer.Meta.fields + ( "name", "description", @@ -461,3 +486,22 @@ class Meta: "redirect_to_object_storage", "hide_guarded_distributions", ) + + +class DomainBackendMigratorSerializer(BackendSettingsValidator, serializers.Serializer): + """Special serializer for performing a storage backend migration on a Domain.""" + + storage_class = serializers.ChoiceField( + help_text=_("The new backend storage class to migrate to."), + choices=BACKEND_CHOICES, + ) + storage_settings = StorageSettingsSerializer( + source="*", help_text=_("The settings for the new storage class to migrate to.") + ) + + def validate(self, data): + """Validate new backend settings.""" + storage_class = data["storage_class"] + storage_settings = data["storage_settings"] + self._validate_storage_backend(storage_class, storage_settings) + return data diff --git a/pulpcore/app/tasks/__init__.py b/pulpcore/app/tasks/__init__.py index 721baebdfd..64385663dd 100644 --- a/pulpcore/app/tasks/__init__.py +++ b/pulpcore/app/tasks/__init__.py @@ -12,6 +12,8 @@ from .importer import pulp_import +from .migrate import migrate_backend + from .orphan import orphan_cleanup from .purge import purge diff --git a/pulpcore/app/tasks/migrate.py b/pulpcore/app/tasks/migrate.py new file mode 100644 index 0000000000..e556a4185c --- /dev/null +++ b/pulpcore/app/tasks/migrate.py @@ -0,0 +1,60 @@ +import logging +from gettext import gettext as _ + +from django.utils.timezone import now +from rest_framework.serializers import ValidationError +from pulpcore.app.models import Artifact, storage, ProgressReport +from pulpcore.app.serializers import DomainBackendMigratorSerializer +from pulpcore.app.util import get_domain + +_logger = logging.getLogger(__name__) + + +def migrate_backend(data): + """ + Copy the artifacts from the current storage backend to a new one. Then update backend settings. + + Args: + data (dict): validated data of the new storage backend settings + """ + domain = get_domain() + old_storage = domain.get_storage() + new_storage = DomainBackendMigratorSerializer(data=data).create_storage() + + artifacts = Artifact.objects.filter(pulp_domain=domain) + date = now() + + with ProgressReport( + message=_("Migrating Artifacts"), code="migrate", total=artifacts.count() + ) as pb: + while True: + for digest in pb.iter(artifacts.values_list("sha256", flat=True)): + filename = storage.get_artifact_path(digest) + if not new_storage.exists(filename): + try: + file = old_storage.open(filename) + except FileNotFoundError: + raise ValidationError( + _( + "Found missing file for artifact(sha256={}). Please run the repair " + "task or delete the offending artifact." + ).format(digest) + ) + new_storage.save(filename, file) + file.close() + # Handle new artifacts saved by the content app + artifacts = Artifact.objects.filter(pulp_domain=domain, pulp_created__gte=date) + if count := artifacts.count(): + pb.total += count + pb.save() + date = now() + continue + break + + # Update the current domain to the new storage backend settings + msg = _("Update Domain({domain})'s Backend Settings").format(domain=domain.name) + with ProgressReport(message=msg, code="update", total=1) as pb: + domain.storage_class = data["storage_class"] + domain.storage_settings = data["storage_settings"] + domain.save(update_fields=["storage_class", "storage_settings"], skip_hooks=True) + pb.increment() diff --git a/pulpcore/app/viewsets/domain.py b/pulpcore/app/viewsets/domain.py index 9394cffd56..40ef1efd61 100644 --- a/pulpcore/app/viewsets/domain.py +++ b/pulpcore/app/viewsets/domain.py @@ -2,13 +2,21 @@ from drf_spectacular.utils import extend_schema from rest_framework import mixins +from rest_framework.decorators import action from rest_framework.exceptions import ValidationError from pulpcore.filters import BaseFilterSet from pulpcore.app.models import Domain -from pulpcore.app.serializers import DomainSerializer, AsyncOperationResponseSerializer +from pulpcore.app.response import OperationPostponedResponse +from pulpcore.app.serializers import ( + DomainSerializer, + DomainBackendMigratorSerializer, + AsyncOperationResponseSerializer, +) +from pulpcore.app.tasks import migrate_backend from pulpcore.app.viewsets import NamedModelViewSet, AsyncRemoveMixin, AsyncUpdateMixin from pulpcore.app.viewsets.base import NAME_FILTER_OPTIONS +from pulpcore.tasking.tasks import dispatch class DomainFilter(BaseFilterSet): @@ -57,7 +65,7 @@ class DomainViewSet( "condition": "has_model_or_obj_perms:core.view_domain", }, { - "action": ["update", "partial_update"], + "action": ["update", "partial_update", "migrate"], "principal": "authenticated", "effect": "allow", "condition": "has_model_or_obj_perms:core.change_domain", @@ -118,3 +126,39 @@ def destroy(self, request, pk, **kwargs): raise ValidationError(_("Default domain can not be deleted.")) return super().destroy(request, pk, **kwargs) + + @extend_schema( + summary="Migrate storage backend", + request=DomainBackendMigratorSerializer, + responses={202: AsyncOperationResponseSerializer}, + ) + @action(detail=False, methods=["post"]) + def migrate(self, request, **kwargs): + """ + Migrate the domain's storage backend to a new one. + + Launches a background task to copy the domain's artifacts over to the supplied storage + backend. Then updates the domain's storage settings to the new storage backend. This task + does not delete the stored files of the artifacts from the previous backend. + + **IMPORTANT** This task will block all other tasks within the domain until the migration is + completed, essentially putting the domain into a read only state. Content will still be + served from the old storage backend until the migration has completed, so don't remove + the old backend until then. Note, this endpoint is not allowed on the default domain. + + This feature is in Tech Preview and is subject to future change and thus not guaranteed to + be backwards compatible. + """ + instance = request.pulp_domain + data = request.data + if instance.name == "default": + raise ValidationError(_("Default domain can not be migrated.")) + serializer = DomainBackendMigratorSerializer(data=data) + serializer.is_valid(raise_exception=True) + + task = dispatch( + migrate_backend, + args=(data,), + exclusive_resources=[instance], + ) + return OperationPostponedResponse(task, request) diff --git a/pulpcore/pytest_plugin.py b/pulpcore/pytest_plugin.py index 716a8bf1ea..c11623419d 100644 --- a/pulpcore/pytest_plugin.py +++ b/pulpcore/pytest_plugin.py @@ -656,13 +656,13 @@ def random_artifact(random_artifact_factory): return random_artifact_factory() -@pytest.fixture() -def domain_factory(pulpcore_bindings, pulp_settings, gen_object_with_cleanup): - def _domain_factory(): +@pytest.fixture +def backend_settings_factory(pulp_settings): + def _settings_factory(storage_class=None, storage_settings=None): if not pulp_settings.DOMAIN_ENABLED: pytest.skip("Domains not enabled") keys = dict() - keys["pulpcore.app.models.storage.FileSystem"] = ["MEDIA_ROOT"] + keys["pulpcore.app.models.storage.FileSystem"] = ["MEDIA_ROOT", "MEDIA_URL"] keys["storages.backends.s3boto3.S3Boto3Storage"] = [ "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", @@ -681,15 +681,31 @@ def _domain_factory(): "AZURE_LOCATION", "AZURE_CONNECTION_STRING", ] - settings = dict() - for key in keys[pulp_settings.DEFAULT_FILE_STORAGE]: - settings[key] = getattr(pulp_settings, key, None) - body = { - "name": str(uuid.uuid4()), - "storage_class": pulp_settings.DEFAULT_FILE_STORAGE, - "storage_settings": settings, - } - return gen_object_with_cleanup(pulpcore_bindings.DomainsApi, body) + settings = storage_settings or dict() + backend = storage_class or pulp_settings.DEFAULT_FILE_STORAGE + for key in keys[backend]: + if key not in settings: + settings[key] = getattr(pulp_settings, key, None) + return backend, settings + + return _settings_factory + + +@pytest.fixture() +def domain_factory( + pulpcore_bindings, pulp_domain_enabled, backend_settings_factory, gen_object_with_cleanup +): + def _domain_factory(**kwargs): + if not pulp_domain_enabled: + pytest.skip("Domains not enabled") + + storage_class, storage_settings = backend_settings_factory( + storage_class=kwargs.pop("storage_class", None), + storage_settings=kwargs.pop("storage_settings", None), + ) + kwargs.setdefault("name", str(uuid.uuid4())) + kwargs.update({"storage_class": storage_class, "storage_settings": storage_settings}) + return gen_object_with_cleanup(pulpcore_bindings.DomainsApi, kwargs) return _domain_factory diff --git a/pulpcore/tests/functional/api/using_plugin/test_migrate.py b/pulpcore/tests/functional/api/using_plugin/test_migrate.py new file mode 100644 index 0000000000..37bf5da395 --- /dev/null +++ b/pulpcore/tests/functional/api/using_plugin/test_migrate.py @@ -0,0 +1,111 @@ +import pytest +import json +import shutil + +from pathlib import Path +from pulpcore.client.pulpcore import ApiException + + +@pytest.mark.parallel +def test_migrate_bad_settings(pulpcore_bindings, domain_factory): + """Test that backend settings are validated before launching the task.""" + domain = domain_factory(storage_class="pulpcore.app.models.storage.FileSystem") + settings = {} + # Test with missing field + body = {"storage_class": "pulpcore.app.models.storage.FileSystem", "storage_settings": settings} + kwargs = {"pulp_domain": domain.name} + with pytest.raises(ApiException) as e: + pulpcore_bindings.DomainsApi.migrate(body, **kwargs) + error_body = json.loads(e.value.body) + assert "storage_settings" in error_body + assert {"location": ["This field is required."]} == error_body["storage_settings"] + # Test with unexpected field + settings["location"] = "/var/lib/pulp/media" + settings["random"] = "random" + body["storage_settings"] = settings + with pytest.raises(ApiException) as e: + pulpcore_bindings.DomainsApi.migrate(body, **kwargs) + assert e.value.status == 400 + error_body = json.loads(e.value.body) + assert "storage_settings" in error_body + assert "Unexpected field" in error_body["storage_settings"].values() + + +@pytest.mark.parallel +def test_migrate_default_domain(pulpcore_bindings, pulp_domain_enabled): + """Test the default domain can not be migrated.""" + domain = pulpcore_bindings.DomainsApi.list(name="default").results[0] + + kwargs = {} + if pulp_domain_enabled: + kwargs["pulp_domain"] = domain.name + with pytest.raises(pulpcore_bindings.ApiException) as e: + pulpcore_bindings.DomainsApi.migrate({}, **kwargs) + assert e.value.status == 400 + assert "Default domain can not be migrated" in e.value.body + + +@pytest.mark.parallel +def test_migrate_domain( + pulpcore_bindings, + domain_factory, + backend_settings_factory, + monitor_task, + random_artifact_factory, +): + """Test migrating from a domain.""" + domain = domain_factory() + domain_id = domain.pulp_href.split("/")[-2] + artifacts = [random_artifact_factory(pulp_domain=domain.name) for _ in range(3)] + old_storage, old_settings = backend_settings_factory() + + body = { + "storage_class": "pulpcore.app.models.storage.FileSystem", + "storage_settings": {"location": "/var/lib/pulp/media"}, + } + task = monitor_task(pulpcore_bindings.DomainsApi.migrate(body, pulp_domain=domain.name).task) + + # Check that the progress reports and 'done's are right + reports = task.progress_reports + assert len(reports) == 2 + assert {3, 1} == {r.done for r in reports} + + # Check that the domain's storage settings was updated + domain = pulpcore_bindings.DomainsApi.read(domain.pulp_href) + assert domain.storage_class == "pulpcore.app.models.storage.FileSystem" + assert domain.storage_settings["location"] == "/var/lib/pulp/media" + + # Check that the files were actually moved + expected_paths = set() + for artifact in artifacts: + a = f"/var/lib/pulp/media/artifact/{domain_id}/{artifact.sha256[0:2]}/{artifact.sha256[2:]}" + expected_paths.add(a) + found_paths = {str(p) for p in Path(f"/var/lib/pulp/media/artifact/{domain_id}").glob("*/*")} + + # Perform cleanup before final assert + shutil.rmtree(f"/var/lib/pulp/media/artifact/{domain_id}") + # Restore original domain settings without migrating + body = {"storage_class": old_storage, "storage_settings": old_settings} + monitor_task(pulpcore_bindings.DomainsApi.partial_update(domain.pulp_href, body).task) + + assert len(expected_paths) == 3 + assert expected_paths == found_paths + + +@pytest.mark.parallel +def test_migrate_empty_domain( + pulpcore_bindings, domain_factory, backend_settings_factory, monitor_task +): + """Test migrating works even when there are no artifacts.""" + domain = domain_factory() + + storage, settings = backend_settings_factory() + body = {"storage_class": storage, "storage_settings": settings} + task = monitor_task(pulpcore_bindings.DomainsApi.migrate(body, pulp_domain=domain.name).task) + + reports = task.progress_reports + assert len(reports) == 2 + for report in reports: + if report.message == "Migrating Artifacts": + assert report.total == 0 + assert report.done == 0