Skip to content

Commit

Permalink
Skip syncing when replicated distributions were not updated
Browse files Browse the repository at this point in the history
closes #5493
  • Loading branch information
lubosmj committed Jul 31, 2024
1 parent c0c6223 commit 3102d8a
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGES/5493.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Optimized the replication mechanism to skip syncing when replicated distributions were not updated.
Generally, the optimization is applied only to distributions that have publications or repository
versions attached to their upstream equivalents.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Generated by Django 4.2.11 on 2024-07-30 21:41

from django.db import migrations, models
import django.db.models.deletion
import django_lifecycle.mixins
import pulpcore.app.models.base


class Migration(migrations.Migration):

dependencies = [
('core', '0121_add_profile_artifacts_table'),
]

operations = [
migrations.CreateModel(
name='DistributionUpdatedTimestamp',
fields=[
('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)),
('pulp_created', models.DateTimeField(auto_now_add=True)),
('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)),
('last_updated', models.DateTimeField()),
('distribution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.distribution')),
('upstream_pulp', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.upstreampulp')),
],
options={
'abstract': False,
},
bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model),
),
migrations.AddField(
model_name='upstreampulp',
name='last_updated_timestamps',
field=models.ManyToManyField(through='core.DistributionUpdatedTimestamp', to='core.distribution'),
),
]
2 changes: 1 addition & 1 deletion pulpcore/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@
from .progress import GroupProgressReport, ProgressReport

# Moved here to avoid a circular import with GroupProgressReport
from .replica import UpstreamPulp
from .replica import UpstreamPulp, DistributionUpdatedTimestamp
17 changes: 16 additions & 1 deletion pulpcore/app/models/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
"""

from django.db import models
from pulpcore.plugin.models import BaseModel, EncryptedTextField, AutoAddObjPermsMixin
from pulpcore.app.util import get_domain_pk
from pulpcore.plugin.models import (
AutoAddObjPermsMixin,
BaseModel,
Distribution,
EncryptedTextField,
)


class UpstreamPulp(BaseModel, AutoAddObjPermsMixin):
Expand All @@ -28,9 +33,19 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin):

pulp_label_select = models.TextField(null=True)

last_updated_timestamps = models.ManyToManyField(
Distribution, through="DistributionUpdatedTimestamp"
)

class Meta:
unique_together = ("name", "pulp_domain")
permissions = [
("replicate_upstreampulp", "Can start a replication task"),
("manage_roles_upstreampulp", "Can manage roles on upstream pulps"),
]


class DistributionUpdatedTimestamp(BaseModel):
distribution = models.ForeignKey(Distribution, on_delete=models.CASCADE)
upstream_pulp = models.ForeignKey(UpstreamPulp, on_delete=models.CASCADE)
last_updated = models.DateTimeField()
92 changes: 83 additions & 9 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from django.db.models import Model
import logging

from django.db.models import Model
from django.utils.dateparse import parse_datetime

from pulp_glue.common.context import PulpContext
from pulpcore.tasking.tasks import dispatch
from pulpcore.app.tasks.base import (
general_update,
general_create,
general_multi_delete,
)
from pulpcore.app.models import (
Distribution,
DistributionUpdatedTimestamp,
UpstreamPulp,
)

from pulpcore.plugin.util import get_url, get_domain

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -72,11 +80,16 @@ def needs_update(fields_dict, model_instance):
needs_update = True
return needs_update

def upstream_repository(self, upstream_distribution):
if upstream_distribution["repository"]:
return self.repository_ctx_cls(self.pulp_ctx).show(upstream_distribution["repository"])

def upstream_distributions(self, labels=None):
if labels:
params = {"pulp_label_select": labels}
else:
params = {}

offset = 0
list_size = 100
while True:
Expand Down Expand Up @@ -159,36 +172,61 @@ def distribution_data(self, repository, upstream_distribution):
"base_path": upstream_distribution["base_path"],
}

def create_or_update_distribution(self, repository, upstream_distribution):
def create_or_update_distribution(
self, server, repository, upstream_repository, upstream_distribution
):
distribution_data = self.distribution_data(repository, upstream_distribution)
last_updated = get_last_updated(upstream_repository, upstream_distribution)
try:
distro = self.distribution_model_cls.objects.get(
name=upstream_distribution["name"], pulp_domain=self.domain
)
# Check that the distribution has the right repository associated
try:
distribution_timestamp = DistributionUpdatedTimestamp.objects.get(
distribution=distro,
upstream_pulp=server,
)
except DistributionUpdatedTimestamp.DoesNotExist:
distribution_timestamp = DistributionUpdatedTimestamp.objects.create(
distribution=distro,
upstream_pulp=server,
last_updated=parse_datetime(last_updated),
)
needs_update = self.needs_update(distribution_data, distro)
if needs_update:
if needs_update or parse_datetime(last_updated) != distribution_timestamp.last_updated:
# Update the distribution
dispatch(
general_update,
distribution_update,
task_group=self.task_group,
shared_resources=[repository],
exclusive_resources=self.distros_uris,
args=(distro.pk, self.app_label, self.distribution_serializer_name),
args=(
distro.pk,
self.app_label,
self.distribution_serializer_name,
server.pk,
last_updated,
),
kwargs={
"data": distribution_data,
"partial": True,
},
)
except self.distribution_model_cls.DoesNotExist:
# Dispatch a task to create the distribution
distribution_data["name"] = upstream_distribution["name"]
distribution_name = distribution_data["name"] = upstream_distribution["name"]
dispatch(
general_create,
distribution_create,
task_group=self.task_group,
shared_resources=[repository],
exclusive_resources=self.distros_uris,
args=(self.app_label, self.distribution_serializer_name),
args=(
self.app_label,
self.distribution_serializer_name,
distribution_name,
server.pk,
last_updated,
),
kwargs={"data": distribution_data},
)

Expand Down Expand Up @@ -246,3 +284,39 @@ def remove_missing(self, names):
exclusive_resources=repositories + remotes,
args=(repository_ids + remote_ids,),
)


def get_last_updated(upstream_repository, upstream_distribution):
if upstream_repository:
if upstream_repository["pulp_last_updated"] > upstream_distribution["pulp_last_updated"]:
return upstream_repository["pulp_last_updated"]
else:
return upstream_distribution["pulp_last_updated"]
else:
return upstream_distribution["pulp_last_updated"]


def distribution_update(distro_pk, app_label, serializer_name, server_pk, last_updated, **kwargs):
general_update(distro_pk, app_label, serializer_name, **kwargs)
upstream_timestamp_update(distro_pk, server_pk, last_updated)


def upstream_timestamp_update(distro_pk, server_pk, last_updated):
distribution = Distribution.objects.get(pk=distro_pk, pulp_domain=get_domain())
server = UpstreamPulp.objects.get(pk=server_pk)
server.distributionupdatedtimestamp_set.filter(distribution=distribution).update(
last_updated=parse_datetime(last_updated)
)


def distribution_create(app_label, serializer_name, distro_name, server_pk, last_updated, **kwargs):
general_create(app_label, serializer_name, **kwargs)
upstream_timestamp_create(distro_name, server_pk, last_updated)


def upstream_timestamp_create(distribution_name, server_pk, last_updated):
distribution = Distribution.objects.get(name=distribution_name, pulp_domain=get_domain())
server = UpstreamPulp.objects.get(pk=server_pk)
DistributionUpdatedTimestamp.objects.create(
distribution=distribution, upstream_pulp=server, last_updated=parse_datetime(last_updated)
)
2 changes: 1 addition & 1 deletion pulpcore/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@
UserRoleSerializer,
UserSerializer,
)
from .replica import UpstreamPulpSerializer
from .replica import UpstreamPulpSerializer, DistributionUpdatedTimestampSerializer
28 changes: 26 additions & 2 deletions pulpcore/app/serializers/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,30 @@
from rest_framework import serializers
from rest_framework.validators import UniqueValidator

from pulpcore.app.serializers import HiddenFieldsMixin
from pulpcore.app.serializers import (
DetailRelatedField,
HiddenFieldsMixin,
IdentityField,
ModelSerializer,
)


from pulpcore.app.models import UpstreamPulp
from pulpcore.app.models import UpstreamPulp, DistributionUpdatedTimestamp, Distribution


class DistributionUpdatedTimestampSerializer(ModelSerializer):
"""
Serializer for recording the last update timestamps from Server's distributions.
"""

distribution = DetailRelatedField(
view_name_pattern=r"distributions(-.*/.*)-detail",
queryset=Distribution.objects.all(),
)

class Meta:
model = DistributionUpdatedTimestamp
fields = ("distribution", "last_updated")


class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin):
Expand Down Expand Up @@ -85,6 +101,13 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin):
required=False,
)

last_updated_timestamps = DistributionUpdatedTimestampSerializer(
help_text=_("A list of update timestamps for replicated distributions"),
source="distributionupdatedtimestamp_set",
many=True,
read_only=True,
)

class Meta:
abstract = True
model = UpstreamPulp
Expand All @@ -96,6 +119,7 @@ class Meta:
"ca_cert",
"client_cert",
"client_key",
"last_updated_timestamps",
"tls_validation",
"username",
"password",
Expand Down
56 changes: 48 additions & 8 deletions pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import sys
from tempfile import NamedTemporaryFile

from django.utils.dateparse import parse_datetime

from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.models import UpstreamPulp, TaskGroup
from pulpcore.app.models import Distribution, DistributionUpdatedTimestamp, UpstreamPulp, TaskGroup
from pulpcore.app.replica import ReplicaContext

from pulp_glue.common import __version__ as pulp_glue_version
Expand Down Expand Up @@ -78,19 +80,57 @@ def replicate_distributions(server_pk):
remote = replicator.create_or_update_remote(upstream_distribution=distro)
if not remote:
# The upstream distribution is not serving any content,
# let if fall throug the cracks and be cleanup below.
# let if fall through the cracks and be cleanup below.
continue
# Check if there is already a repository
repository = replicator.create_or_update_repository(remote=remote)

# Dispatch a sync task
replicator.sync(repository, remote)
repo = replicator.upstream_repository(distro)

# Dispatch a sync task if needed
if _requires_syncing(server, repo, distro, replicator.distribution_model_cls):
replicator.sync(repository, remote)

# Get or create a distribution
replicator.create_or_update_distribution(repository, distro)
# Get or create a distribution
replicator.create_or_update_distribution(server, repository, repo, distro)

# Add name to the list of known distribution names
distro_names.append(distro["name"])
# Add name to the list of known distribution names
distro_names.append(distro["name"])

replicator.remove_missing(distro_names)
task_group.finish()


def _requires_syncing(server, repo, distro, distro_model):
try:
local_distribution = Distribution.objects.get(
name=distro["name"], pulp_domain=server.domain
)
except Distribution.DoesNotExist:
return True

try:
updated_timestamp = DistributionUpdatedTimestamp.objects.get(
distribution=local_distribution, upstream_pulp=server
)
except DistributionUpdatedTimestamp.DoesNotExist:
return True

if distro.get("publication") or distro.get("repository_version"):
if updated_timestamp.last_updated == parse_datetime(distro["pulp_last_updated"]):
return False

if repo and updated_timestamp.last_updated != parse_datetime(repo["pulp_last_updated"]):
return True

# This has a few limitations. It is hard to explore the space of all possible publications that
# could be created after committing a new repository version. Thus, there might be a case where
# a user published an older repository version and removed a publication for a newer repository
# version, making the task of exploring the latest available publication unmanageable within a
# reasonable time span. Also, the latest repository version may not be published. Therefore,
# let's pretend that the vast majority of users rely on repository versioning and publish latest
# repository versions in a distribution->repository setting.
if distro_model.SERVE_FROM_PUBLICATION:
return True

return False
Loading

0 comments on commit 3102d8a

Please sign in to comment.