diff --git a/CHANGES/5493.feature b/CHANGES/5493.feature new file mode 100644 index 00000000000..435f726e41c --- /dev/null +++ b/CHANGES/5493.feature @@ -0,0 +1,3 @@ +Optimized the replication mechanism to skip syncing when replicated distributions were not updated. +Generally, the optimization is applied only to distributions that have publications or repository +versions attached to their upstream equivalents. diff --git a/pulpcore/app/migrations/0122_record_timestamps_for_replicated_distributions.py b/pulpcore/app/migrations/0122_record_timestamps_for_replicated_distributions.py new file mode 100644 index 00000000000..0a0e5490f6d --- /dev/null +++ b/pulpcore/app/migrations/0122_record_timestamps_for_replicated_distributions.py @@ -0,0 +1,36 @@ +# Generated by Django 4.2.11 on 2024-07-30 21:41 + +from django.db import migrations, models +import django.db.models.deletion +import django_lifecycle.mixins +import pulpcore.app.models.base + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0121_add_profile_artifacts_table'), + ] + + operations = [ + migrations.CreateModel( + name='DistributionUpdatedTimestamp', + fields=[ + ('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)), + ('pulp_created', models.DateTimeField(auto_now_add=True)), + ('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)), + ('last_updated', models.DateTimeField()), + ('distribution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.distribution')), + ('upstream_pulp', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.upstreampulp')), + ], + options={ + 'abstract': False, + }, + bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model), + ), + migrations.AddField( + model_name='upstreampulp', + name='last_updated_timestamps', + field=models.ManyToManyField(through='core.DistributionUpdatedTimestamp', to='core.distribution'), + ), + ] diff --git a/pulpcore/app/models/__init__.py b/pulpcore/app/models/__init__.py index 4eb55919d36..55720dab9e2 100644 --- a/pulpcore/app/models/__init__.py +++ b/pulpcore/app/models/__init__.py @@ -91,4 +91,4 @@ from .progress import GroupProgressReport, ProgressReport # Moved here to avoid a circular import with GroupProgressReport -from .replica import UpstreamPulp +from .replica import UpstreamPulp, DistributionUpdatedTimestamp diff --git a/pulpcore/app/models/replica.py b/pulpcore/app/models/replica.py index 348ff7a8ed3..aca5aa9582b 100644 --- a/pulpcore/app/models/replica.py +++ b/pulpcore/app/models/replica.py @@ -6,8 +6,13 @@ """ from django.db import models -from pulpcore.plugin.models import BaseModel, EncryptedTextField, AutoAddObjPermsMixin from pulpcore.app.util import get_domain_pk +from pulpcore.plugin.models import ( + AutoAddObjPermsMixin, + BaseModel, + Distribution, + EncryptedTextField, +) class UpstreamPulp(BaseModel, AutoAddObjPermsMixin): @@ -28,9 +33,19 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin): pulp_label_select = models.TextField(null=True) + last_updated_timestamps = models.ManyToManyField( + Distribution, through="DistributionUpdatedTimestamp" + ) + class Meta: unique_together = ("name", "pulp_domain") permissions = [ ("replicate_upstreampulp", "Can start a replication task"), ("manage_roles_upstreampulp", "Can manage roles on upstream pulps"), ] + + +class DistributionUpdatedTimestamp(BaseModel): + distribution = models.ForeignKey(Distribution, on_delete=models.CASCADE) + upstream_pulp = models.ForeignKey(UpstreamPulp, on_delete=models.CASCADE) + last_updated = models.DateTimeField() diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 2ca7f457571..8871b67c06d 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -1,6 +1,8 @@ -from django.db.models import Model import logging +from django.db.models import Model +from django.utils.dateparse import parse_datetime + from pulp_glue.common.context import PulpContext from pulpcore.tasking.tasks import dispatch from pulpcore.app.tasks.base import ( @@ -8,6 +10,12 @@ general_create, general_multi_delete, ) +from pulpcore.app.models import ( + Distribution, + DistributionUpdatedTimestamp, + UpstreamPulp, +) + from pulpcore.plugin.util import get_url, get_domain _logger = logging.getLogger(__name__) @@ -72,11 +80,16 @@ def needs_update(fields_dict, model_instance): needs_update = True return needs_update + def upstream_repository(self, upstream_distribution): + if upstream_distribution["repository"]: + return self.repository_ctx_cls(self.pulp_ctx).show(upstream_distribution["repository"]) + def upstream_distributions(self, labels=None): if labels: params = {"pulp_label_select": labels} else: params = {} + offset = 0 list_size = 100 while True: @@ -159,22 +172,41 @@ def distribution_data(self, repository, upstream_distribution): "base_path": upstream_distribution["base_path"], } - def create_or_update_distribution(self, repository, upstream_distribution): + def create_or_update_distribution( + self, server, repository, upstream_repository, upstream_distribution + ): distribution_data = self.distribution_data(repository, upstream_distribution) + last_updated = get_last_updated(upstream_repository, upstream_distribution) try: distro = self.distribution_model_cls.objects.get( name=upstream_distribution["name"], pulp_domain=self.domain ) - # Check that the distribution has the right repository associated + try: + distribution_timestamp = DistributionUpdatedTimestamp.objects.get( + distribution=distro, + upstream_pulp=server, + ) + except DistributionUpdatedTimestamp.DoesNotExist: + distribution_timestamp = DistributionUpdatedTimestamp.objects.create( + distribution=distro, + upstream_pulp=server, + last_updated=parse_datetime(last_updated), + ) needs_update = self.needs_update(distribution_data, distro) - if needs_update: + if needs_update or parse_datetime(last_updated) != distribution_timestamp.last_updated: # Update the distribution dispatch( - general_update, + distribution_update, task_group=self.task_group, shared_resources=[repository], exclusive_resources=self.distros_uris, - args=(distro.pk, self.app_label, self.distribution_serializer_name), + args=( + distro.pk, + self.app_label, + self.distribution_serializer_name, + server.pk, + last_updated, + ), kwargs={ "data": distribution_data, "partial": True, @@ -182,13 +214,19 @@ def create_or_update_distribution(self, repository, upstream_distribution): ) except self.distribution_model_cls.DoesNotExist: # Dispatch a task to create the distribution - distribution_data["name"] = upstream_distribution["name"] + distribution_name = distribution_data["name"] = upstream_distribution["name"] dispatch( - general_create, + distribution_create, task_group=self.task_group, shared_resources=[repository], exclusive_resources=self.distros_uris, - args=(self.app_label, self.distribution_serializer_name), + args=( + self.app_label, + self.distribution_serializer_name, + distribution_name, + server.pk, + last_updated, + ), kwargs={"data": distribution_data}, ) @@ -246,3 +284,39 @@ def remove_missing(self, names): exclusive_resources=repositories + remotes, args=(repository_ids + remote_ids,), ) + + +def get_last_updated(upstream_repository, upstream_distribution): + if upstream_repository: + if upstream_repository["pulp_last_updated"] > upstream_distribution["pulp_last_updated"]: + return upstream_repository["pulp_last_updated"] + else: + return upstream_distribution["pulp_last_updated"] + else: + return upstream_distribution["pulp_last_updated"] + + +def distribution_update(distro_pk, app_label, serializer_name, server_pk, last_updated, **kwargs): + general_update(distro_pk, app_label, serializer_name, **kwargs) + upstream_timestamp_update(distro_pk, server_pk, last_updated) + + +def upstream_timestamp_update(distro_pk, server_pk, last_updated): + distribution = Distribution.objects.get(pk=distro_pk, pulp_domain=get_domain()) + server = UpstreamPulp.objects.get(pk=server_pk) + server.distributionupdatedtimestamp_set.filter(distribution=distribution).update( + last_updated=parse_datetime(last_updated) + ) + + +def distribution_create(app_label, serializer_name, distro_name, server_pk, last_updated, **kwargs): + general_create(app_label, serializer_name, **kwargs) + upstream_timestamp_create(distro_name, server_pk, last_updated) + + +def upstream_timestamp_create(distribution_name, server_pk, last_updated): + distribution = Distribution.objects.get(name=distribution_name, pulp_domain=get_domain()) + server = UpstreamPulp.objects.get(pk=server_pk) + DistributionUpdatedTimestamp.objects.create( + distribution=distribution, upstream_pulp=server, last_updated=parse_datetime(last_updated) + ) diff --git a/pulpcore/app/serializers/__init__.py b/pulpcore/app/serializers/__init__.py index 0d8be05b3bb..dd03bef8314 100644 --- a/pulpcore/app/serializers/__init__.py +++ b/pulpcore/app/serializers/__init__.py @@ -115,4 +115,4 @@ UserRoleSerializer, UserSerializer, ) -from .replica import UpstreamPulpSerializer +from .replica import UpstreamPulpSerializer, DistributionUpdatedTimestampSerializer diff --git a/pulpcore/app/serializers/replica.py b/pulpcore/app/serializers/replica.py index af3206ce1bf..9c09ff03614 100644 --- a/pulpcore/app/serializers/replica.py +++ b/pulpcore/app/serializers/replica.py @@ -3,14 +3,30 @@ from rest_framework import serializers from rest_framework.validators import UniqueValidator -from pulpcore.app.serializers import HiddenFieldsMixin from pulpcore.app.serializers import ( + DetailRelatedField, + HiddenFieldsMixin, IdentityField, ModelSerializer, ) -from pulpcore.app.models import UpstreamPulp +from pulpcore.app.models import UpstreamPulp, DistributionUpdatedTimestamp, Distribution + + +class DistributionUpdatedTimestampSerializer(ModelSerializer): + """ + Serializer for recording the last update timestamps from Server's distributions. + """ + + distribution = DetailRelatedField( + view_name_pattern=r"distributions(-.*/.*)-detail", + queryset=Distribution.objects.all(), + ) + + class Meta: + model = DistributionUpdatedTimestamp + fields = ("distribution", "last_updated") class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin): @@ -85,6 +101,13 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin): required=False, ) + last_updated_timestamps = DistributionUpdatedTimestampSerializer( + help_text=_("A list of update timestamps for replicated distributions"), + source="distributionupdatedtimestamp_set", + many=True, + read_only=True, + ) + class Meta: abstract = True model = UpstreamPulp @@ -96,6 +119,7 @@ class Meta: "ca_cert", "client_cert", "client_key", + "last_updated_timestamps", "tls_validation", "username", "password", diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 53a37973bfa..c44eebac5ec 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -4,8 +4,10 @@ import sys from tempfile import NamedTemporaryFile +from django.utils.dateparse import parse_datetime + from pulpcore.app.apps import pulp_plugin_configs -from pulpcore.app.models import UpstreamPulp, TaskGroup +from pulpcore.app.models import Distribution, DistributionUpdatedTimestamp, UpstreamPulp, TaskGroup from pulpcore.app.replica import ReplicaContext from pulp_glue.common import __version__ as pulp_glue_version @@ -78,19 +80,57 @@ def replicate_distributions(server_pk): remote = replicator.create_or_update_remote(upstream_distribution=distro) if not remote: # The upstream distribution is not serving any content, - # let if fall throug the cracks and be cleanup below. + # let if fall through the cracks and be cleanup below. continue # Check if there is already a repository repository = replicator.create_or_update_repository(remote=remote) - # Dispatch a sync task - replicator.sync(repository, remote) + repo = replicator.upstream_repository(distro) + + # Dispatch a sync task if needed + if _requires_syncing(server, repo, distro, replicator.distribution_model_cls): + replicator.sync(repository, remote) - # Get or create a distribution - replicator.create_or_update_distribution(repository, distro) + # Get or create a distribution + replicator.create_or_update_distribution(server, repository, repo, distro) - # Add name to the list of known distribution names - distro_names.append(distro["name"]) + # Add name to the list of known distribution names + distro_names.append(distro["name"]) replicator.remove_missing(distro_names) task_group.finish() + + +def _requires_syncing(server, repo, distro, distro_model): + try: + local_distribution = Distribution.objects.get( + name=distro["name"], pulp_domain=server.domain + ) + except Distribution.DoesNotExist: + return True + + try: + updated_timestamp = DistributionUpdatedTimestamp.objects.get( + distribution=local_distribution, upstream_pulp=server + ) + except DistributionUpdatedTimestamp.DoesNotExist: + return True + + if distro.get("publication") or distro.get("repository_version"): + if updated_timestamp.last_updated == parse_datetime(distro["pulp_last_updated"]): + return False + + if repo and updated_timestamp.last_updated != parse_datetime(repo["pulp_last_updated"]): + return True + + # This has a few limitations. It is hard to explore the space of all possible publications that + # could be created after committing a new repository version. Thus, there might be a case where + # a user published an older repository version and removed a publication for a newer repository + # version, making the task of exploring the latest available publication unmanageable within a + # reasonable time span. Also, the latest repository version may not be published. Therefore, + # let's pretend that the vast majority of users rely on repository versioning and publish latest + # repository versions in a distribution->repository setting. + if distro_model.SERVE_FROM_PUBLICATION: + return True + + return False diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index cfdcb4e6d3a..459bed51b56 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -3,8 +3,9 @@ from pulpcore.client.pulpcore import ApiException from pulpcore.client.pulpcore import AsyncOperationResponse +from pulpcore.client.pulp_file import RepositorySyncURL -from pulpcore.tests.functional.utils import PulpTaskGroupError +from pulpcore.tests.functional.utils import PulpTaskGroupError, generate_iso @pytest.mark.parallel @@ -227,6 +228,126 @@ def test_replication_with_wrong_ca_cert( assert task.state == "completed" +@pytest.mark.parallel +def test_replication_optimization( + domain_factory, + bindings_cfg, + pulpcore_bindings, + pulp_settings, + file_bindings, + file_repository_factory, + file_remote_factory, + file_distribution_factory, + file_publication_factory, + basic_manifest_path, + monitor_task, + monitor_task_group, + gen_object_with_cleanup, + tmp_path, +): + non_default_domain = domain_factory() + source_domain = domain_factory() + upstream_pulp_body = { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": source_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + } + upstream_pulp = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, upstream_pulp_body, pulp_domain=non_default_domain.name + ) + + # sync a repository on the "remote" Pulp instance + upstream_remote = file_remote_factory( + pulp_domain=source_domain.name, manifest_path=basic_manifest_path, policy="immediate" + ) + upstream_repository = file_repository_factory(pulp_domain=source_domain.name) + + repository_sync_data = RepositorySyncURL(remote=upstream_remote.pulp_href, mirror=True) + response = file_bindings.RepositoriesFileApi.sync( + upstream_repository.pulp_href, repository_sync_data + ) + monitor_task(response.task) + upstream_repository = file_bindings.RepositoriesFileApi.read(upstream_repository.pulp_href) + upstream_publication = file_publication_factory( + pulp_domain=source_domain.name, repository_version=upstream_repository.latest_version_href + ) + upstream_distribution = file_distribution_factory( + pulp_domain=source_domain.name, publication=upstream_publication.pulp_href + ) + + # replicate the "remote" instance + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + task_group = monitor_task_group(response.task_group) + for task in task_group.tasks: + assert task.state == "completed" + + distribution = file_bindings.DistributionsFileApi.list( + name=upstream_distribution.name, + pulp_domain=non_default_domain.name, + ).results[0] + + upstream_pulp = pulpcore_bindings.UpstreamPulpsApi.read(upstream_pulp.pulp_href) + + for timestamp in upstream_pulp.last_updated_timestamps: + if timestamp.distribution == distribution.pulp_href: + assert upstream_distribution.pulp_last_updated == timestamp.last_updated + break + else: + assert False, "The replica of the upstream distribution was not found" + + # replicate the "remote" instance to check if the timestamp was NOT changed + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + task_group = monitor_task_group(response.task_group) + for task in task_group.tasks: + assert task.state == "completed" + + upstream_pulp = pulpcore_bindings.UpstreamPulpsApi.read(upstream_pulp.pulp_href) + for timestamp in upstream_pulp.last_updated_timestamps: + if timestamp.distribution == distribution.pulp_href: + assert upstream_distribution.pulp_last_updated == timestamp.last_updated + break + + # upload new content to the repository on the "remote" Pulp instance (creating a new version) + filename = tmp_path / str(uuid.uuid4()) + generate_iso(filename) + relative_path = "1.iso" + + response = file_bindings.ContentFilesApi.create( + relative_path, + file=filename, + repository=upstream_repository.pulp_href, + pulp_domain=source_domain.name, + ) + monitor_task(response.task) + upstream_repository = file_bindings.RepositoriesFileApi.read(upstream_repository.pulp_href) + upstream_publication = file_publication_factory( + pulp_domain=source_domain.name, repository_version=upstream_repository.latest_version_href + ) + response = file_bindings.DistributionsFileApi.partial_update( + upstream_distribution.pulp_href, + {"publication": upstream_publication.pulp_href}, + ) + monitor_task(response.task) + upstream_distribution = file_bindings.DistributionsFileApi.read(upstream_distribution.pulp_href) + + # replicate the "remote" instance to check if the timestamp was correctly changed + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + task_group = monitor_task_group(response.task_group) + for task in task_group.tasks: + assert task.state == "completed" + + upstream_pulp = pulpcore_bindings.UpstreamPulpsApi.read(upstream_pulp.pulp_href) + for timestamp in upstream_pulp.last_updated_timestamps: + if timestamp.distribution == distribution.pulp_href: + assert upstream_distribution.pulp_last_updated == timestamp.last_updated + break + else: + assert False, "The replica of the upstream distribution was not found" + + @pytest.fixture() def gen_users(gen_user): """Returns a user generator function for the tests."""