Skip to content

Commit

Permalink
When aggregating, recalculate last record if partial (fixes DEV-62)
Browse files Browse the repository at this point in the history
Until now, when aggregating, it ignored and did not store the last
record of the aggregated time series if it contained the MISS flag.
This was because more source time series data might become available
later. However it also meant that if more source time series data never
became available, the last aggregated record would never be calculated.
In addition, we sometimes want to get an estimation of the current daily
value a bit before midnight.

Now the last record is calculated alright given enough data (i.e. if
the missing data are up to max_missing), and it is recalculated when
more source data become available.

This commit also upgrades pthelma to 2.1, and, correspondingly, changes
the missing flag from "MISS" to "MISSING3", where, in this example, 3 is
the number of missing values in the source data. This will not migrate
existing records. This lack of migration should not cause any problems
with aggregation.
  • Loading branch information
aptiko committed Jan 10, 2025
1 parent 37cd9d6 commit 44e0d56
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 30 deletions.
46 changes: 23 additions & 23 deletions enhydris/autoprocess/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,27 @@ def save(self, force_insert=False, force_update=False, *args, **kwargs):
self._check_resulting_timestamp_offset()
super().save(force_insert, force_update, *args, **kwargs)

def _get_start_date(self):
if self._last_target_timeseries_record_needs_recalculation():
# NOTE:
# Running ...latest().delete() won't work. Maybe because currently
# TimeseriesRecord has some primary key hacks.
adate = self.target_timeseries.timeseriesrecord_set.latest().timestamp
self.target_timeseries.timeseriesrecord_set.filter(timestamp=adate).delete()
self.target_timeseries.save()
return super()._get_start_date()

def _last_target_timeseries_record_needs_recalculation(self):
# No recalculation needed if it didn't have the "MISSING" flag.
if "MISS" not in self.target_timeseries.get_last_record_as_string():
return False

# Technically we should examine the number of missing values (given in the flag)
# and whether more data has become available for that date. But this would be
# quite complicated so we won't do it. The worst that can happen is that the
# last aggregated record gets unnecessarily recalculated for a few times.
return True

def _check_resulting_timestamp_offset(self):
if not self.resulting_timestamp_offset:
return
Expand All @@ -489,7 +510,7 @@ def process_timeseries(self):
logging.getLogger("enhydris.autoprocess").error(str(e))
return HTimeseries()
aggregated = self._aggregate_time_series(regularized)
return self._trim_last_record_if_not_complete(aggregated)
return aggregated

def _regularize_time_series(self, source_htimeseries):
mode = self.method == "mean" and RM.INSTANTANEOUS or RM.INTERVAL
Expand All @@ -509,6 +530,7 @@ def _aggregate_time_series(self, source_htimeseries):
self.method,
min_count=min_count,
target_timestamp_offset=self.resulting_timestamp_offset or None,
missing_flag="MISSING{}",
)

def _get_source_step(self, source_htimeseries):
Expand All @@ -524,25 +546,3 @@ def _divide_target_step_by_source_step(self, source_step, target_step):
return int(
pd.Timedelta(target_step) / pd.tseries.frequencies.to_offset(source_step)
)

def _trim_last_record_if_not_complete(self, ahtimeseries):
# If the very last record of the time series has the "MISS" flag, it means it
# was derived with one or more missing values in the source. We don't want to
# leave such a record at the end of the target time series, or it won't be
# re-calculated when more data becomes available, because processing begins at
# the record following the last existing one.
if self._last_target_record_needs_trimming(ahtimeseries):
ahtimeseries.data = ahtimeseries.data[:-1]
return ahtimeseries

def _last_target_record_needs_trimming(self, ahtimeseries):
if len(ahtimeseries.data.index) == 0:
return False
last_target_record = ahtimeseries.data.iloc[-1]
last_target_record_date = last_target_record.name + pd.Timedelta(
self.resulting_timestamp_offset
)
return (
"MISS" in last_target_record["flags"]
and self.source_end_date < last_target_record_date
)
126 changes: 122 additions & 4 deletions enhydris/autoprocess/tests/test_models/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
from unittest import mock

from django.db import IntegrityError
from django.test import TestCase
from django.test import TestCase, override_settings

import numpy as np
import pandas as pd
from haggregate import RegularizationMode as RM
from htimeseries import HTimeseries
from model_bakery import baker

from enhydris.autoprocess.models import Aggregation
from enhydris.autoprocess.models import Aggregation, AutoProcess
from enhydris.models import Station, Timeseries, TimeseriesGroup, Variable
from enhydris.tests.test_models.test_timeseries import get_tzinfo


class AggregationTestCase(TestCase):
Expand Down Expand Up @@ -197,7 +198,7 @@ class AggregationProcessTimeseriesTestCase(TestCase):
)

expected_result_for_max_missing_one = pd.DataFrame(
data={"value": [56.0, 157.0], "flags": ["", "MISS"]},
data={"value": [56.0, 157.0], "flags": ["", "MISSING1"]},
columns=["value", "flags"],
index=[
dt.datetime(2019, 5, 21, 10, 59, tzinfo=dt.timezone.utc),
Expand All @@ -206,12 +207,16 @@ class AggregationProcessTimeseriesTestCase(TestCase):
)

expected_result_for_max_missing_five = pd.DataFrame(
data={"value": [2.0, 56.0, 157.0], "flags": ["MISS", "", "MISS"]},
data={
"value": [2.0, 56.0, 157.0, 202.0],
"flags": ["MISSING5", "", "MISSING1", "MISSING2"],
},
columns=["value", "flags"],
index=[
dt.datetime(2019, 5, 21, 9, 59, tzinfo=dt.timezone.utc),
dt.datetime(2019, 5, 21, 10, 59, tzinfo=dt.timezone.utc),
dt.datetime(2019, 5, 21, 11, 59, tzinfo=dt.timezone.utc),
dt.datetime(2019, 5, 21, 12, 59, tzinfo=dt.timezone.utc),
],
)

Expand Down Expand Up @@ -352,3 +357,116 @@ def test_max(self, mock_regularize, mock_haggregate):
self.aggregation.method = "max"
self.aggregation.process_timeseries()
self.assertEqual(mock_regularize.call_args.kwargs["mode"], RM.INTERVAL)


@override_settings(
CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}}
)
class AggregationRecalculatesLastValueIfNeededTestCase(TestCase):
"""
This test case uses this source time series:
2019-05-21 17:00:00+02:00 0.0
2019-05-21 17:10:00+02:00 1.0
2019-05-21 17:20:00+02:00 2.0
2019-05-21 17:30:00+02:00 3.0
2019-05-21 17:40:00+02:00 4.0
2019-05-21 17:50:00+02:00 5.0
2019-05-21 18:00:00+02:00 6.0
2019-05-21 18:10:00+02:00 7.0
2019-05-21 18:20:00+02:00 8.0
2019-05-21 18:30:00+02:00 9.0
2019-05-21 18:40:00+02:00 10.0
It makes aggregation to hourly, and the last aggregated record (19:00) has two
missing values in the source time series, and therefore the MISSING2 flag.
Subsequently these two records are added:
2019-05-21 18:50:00+02:00 11.0
2019-05-21 19:00:00+02:00 12.0
Then aggregation is repeated, and it is verified that the aggregated record at
19:00 is recalculated as needed.
"""

def setUp(self):
station = baker.make(Station, name="Hobbiton", display_timezone="Etc/GMT-2")
timeseries_group = baker.make(
TimeseriesGroup,
gentity=station,
variable__descr="h",
precision=0,
)
source_timeseries = baker.make(
Timeseries,
timeseries_group=timeseries_group,
type=Timeseries.CHECKED,
time_step="10min",
)
start_date = dt.datetime(2019, 5, 21, 17, 0, tzinfo=get_tzinfo("Etc/GMT-2"))
index = [start_date + dt.timedelta(minutes=i) for i in range(0, 110, 10)]
values = [float(x) for x in range(11)]
flags = [""] * 11
source_timeseries.set_data(
pd.DataFrame(
data={"value": values, "flags": flags},
columns=["value", "flags"],
index=index,
)
)
aggregation = Aggregation(
timeseries_group=timeseries_group,
target_time_step="1h",
method="sum",
max_missing=2,
resulting_timestamp_offset="",
)
super(AutoProcess, aggregation).save() # Avoid triggering a celery task
self.aggregation_id = aggregation.id

def test_initial_target_timeseries(self):
aggregation = Aggregation.objects.get(id=self.aggregation_id)
aggregation.execute()
actual_data = aggregation.target_timeseries.get_data().data
expected_data = pd.DataFrame(
data={"value": [21.0, 34.0], "flags": ["", "MISSING2"]},
columns=["value", "flags"],
index=[
dt.datetime(2019, 5, 21, 18, 0, tzinfo=get_tzinfo("Etc/GMT-2")),
dt.datetime(2019, 5, 21, 19, 0, tzinfo=get_tzinfo("Etc/GMT-2")),
],
)
expected_data.index.name = "date"
pd.testing.assert_frame_equal(actual_data, expected_data)

def test_updated_target_timeseries(self):
Aggregation.objects.get(id=self.aggregation_id).execute()

self._extend_source_timeseries()
aggregation = Aggregation.objects.get(id=self.aggregation_id)
aggregation.execute()

ahtimeseries = aggregation.target_timeseries.get_data()
expected_data = pd.DataFrame(
data={"value": [21.0, 57.0], "flags": ["", ""]},
columns=["value", "flags"],
index=[
dt.datetime(2019, 5, 21, 18, 0, tzinfo=get_tzinfo("Etc/GMT-2")),
dt.datetime(2019, 5, 21, 19, 0, tzinfo=get_tzinfo("Etc/GMT-2")),
],
)
expected_data.index.name = "date"
pd.testing.assert_frame_equal(ahtimeseries.data, expected_data)

def _extend_source_timeseries(self):
aggregation = Aggregation.objects.get(id=self.aggregation_id)
source_timeseries = aggregation.source_timeseries
new_values = [11.0, 12.0]
new_flags = ["", ""]
end_date = source_timeseries.end_date
delta = dt.timedelta
new_dates = [end_date + delta(minutes=10), end_date + delta(minutes=20)]
new_data = pd.DataFrame(
data={"value": new_values, "flags": new_flags},
columns=["value", "flags"],
index=new_dates,
)
source_timeseries.append_data(new_data)
5 changes: 3 additions & 2 deletions enhydris/models/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,14 @@ def save(self, force_insert=False, force_update=False, *args, **kwargs):


class TimeseriesRecord(models.Model):
# Ugly primary key hack.
# Ugly primary key hack - FIX ME in Django 5.2.
# Django does not allow composite primary keys, whereas timescaledb can't work
# without them. Our composite primary key in this case is (timeseries, timestamp).
# What we do is set managed=False, so that Django won't create the table itself;
# we create it with migrations.RunSQL(). We also set "primary_key=True" in one of
# the fields. While technically this is wrong, it fools Django into not expecting
# an "id" field to exist, and it doesn't affect querying functionality.
# an "id" field to exist, and it doesn't affect querying functionality (except in
# one case in autoprocess.models.Aggregation._get_start_date(), see comment there).
timeseries = models.ForeignKey(Timeseries, on_delete=models.CASCADE)
timestamp = models.DateTimeField(primary_key=True, verbose_name=_("Timestamp"))
value = models.FloatField(blank=True, null=True, verbose_name=_("Value"))
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ django-simple-captcha>=0.5.14,<1
django-bootstrap4>=2,<4
requests>=2.25,<3
defusedxml>=0.7.1,<1
pthelma[all]>=2,<3
pthelma[all]>=2.1,<3
matplotlib>=3,<3.7

0 comments on commit 44e0d56

Please sign in to comment.