diff --git a/enhydris/autoprocess/models.py b/enhydris/autoprocess/models.py index 7afeb370..2fbfd8ab 100644 --- a/enhydris/autoprocess/models.py +++ b/enhydris/autoprocess/models.py @@ -463,6 +463,27 @@ def save(self, force_insert=False, force_update=False, *args, **kwargs): self._check_resulting_timestamp_offset() super().save(force_insert, force_update, *args, **kwargs) + def _get_start_date(self): + if self._last_target_timeseries_record_needs_recalculation(): + # NOTE: + # Running ...latest().delete() won't work. Maybe because currently + # TimeseriesRecord has some primary key hacks. + adate = self.target_timeseries.timeseriesrecord_set.latest().timestamp + self.target_timeseries.timeseriesrecord_set.filter(timestamp=adate).delete() + self.target_timeseries.save() + return super()._get_start_date() + + def _last_target_timeseries_record_needs_recalculation(self): + # No recalculation needed if it didn't have the "MISSING" flag. + if "MISS" not in self.target_timeseries.get_last_record_as_string(): + return False + + # Technically we should examine the number of missing values (given in the flag) + # and whether more data has become available for that date. But this would be + # quite complicated so we won't do it. The worst that can happen is that the + # last aggregated record gets unnecessarily recalculated for a few times. + return True + def _check_resulting_timestamp_offset(self): if not self.resulting_timestamp_offset: return @@ -489,7 +510,7 @@ def process_timeseries(self): logging.getLogger("enhydris.autoprocess").error(str(e)) return HTimeseries() aggregated = self._aggregate_time_series(regularized) - return self._trim_last_record_if_not_complete(aggregated) + return aggregated def _regularize_time_series(self, source_htimeseries): mode = self.method == "mean" and RM.INSTANTANEOUS or RM.INTERVAL @@ -509,6 +530,7 @@ def _aggregate_time_series(self, source_htimeseries): self.method, min_count=min_count, target_timestamp_offset=self.resulting_timestamp_offset or None, + missing_flag="MISSING{}", ) def _get_source_step(self, source_htimeseries): @@ -524,25 +546,3 @@ def _divide_target_step_by_source_step(self, source_step, target_step): return int( pd.Timedelta(target_step) / pd.tseries.frequencies.to_offset(source_step) ) - - def _trim_last_record_if_not_complete(self, ahtimeseries): - # If the very last record of the time series has the "MISS" flag, it means it - # was derived with one or more missing values in the source. We don't want to - # leave such a record at the end of the target time series, or it won't be - # re-calculated when more data becomes available, because processing begins at - # the record following the last existing one. - if self._last_target_record_needs_trimming(ahtimeseries): - ahtimeseries.data = ahtimeseries.data[:-1] - return ahtimeseries - - def _last_target_record_needs_trimming(self, ahtimeseries): - if len(ahtimeseries.data.index) == 0: - return False - last_target_record = ahtimeseries.data.iloc[-1] - last_target_record_date = last_target_record.name + pd.Timedelta( - self.resulting_timestamp_offset - ) - return ( - "MISS" in last_target_record["flags"] - and self.source_end_date < last_target_record_date - ) diff --git a/enhydris/autoprocess/tests/test_models/test_aggregation.py b/enhydris/autoprocess/tests/test_models/test_aggregation.py index 087799b3..c106a50f 100644 --- a/enhydris/autoprocess/tests/test_models/test_aggregation.py +++ b/enhydris/autoprocess/tests/test_models/test_aggregation.py @@ -2,7 +2,7 @@ from unittest import mock from django.db import IntegrityError -from django.test import TestCase +from django.test import TestCase, override_settings import numpy as np import pandas as pd @@ -10,8 +10,9 @@ from htimeseries import HTimeseries from model_bakery import baker -from enhydris.autoprocess.models import Aggregation +from enhydris.autoprocess.models import Aggregation, AutoProcess from enhydris.models import Station, Timeseries, TimeseriesGroup, Variable +from enhydris.tests.test_models.test_timeseries import get_tzinfo class AggregationTestCase(TestCase): @@ -197,7 +198,7 @@ class AggregationProcessTimeseriesTestCase(TestCase): ) expected_result_for_max_missing_one = pd.DataFrame( - data={"value": [56.0, 157.0], "flags": ["", "MISS"]}, + data={"value": [56.0, 157.0], "flags": ["", "MISSING1"]}, columns=["value", "flags"], index=[ dt.datetime(2019, 5, 21, 10, 59, tzinfo=dt.timezone.utc), @@ -206,12 +207,16 @@ class AggregationProcessTimeseriesTestCase(TestCase): ) expected_result_for_max_missing_five = pd.DataFrame( - data={"value": [2.0, 56.0, 157.0], "flags": ["MISS", "", "MISS"]}, + data={ + "value": [2.0, 56.0, 157.0, 202.0], + "flags": ["MISSING5", "", "MISSING1", "MISSING2"], + }, columns=["value", "flags"], index=[ dt.datetime(2019, 5, 21, 9, 59, tzinfo=dt.timezone.utc), dt.datetime(2019, 5, 21, 10, 59, tzinfo=dt.timezone.utc), dt.datetime(2019, 5, 21, 11, 59, tzinfo=dt.timezone.utc), + dt.datetime(2019, 5, 21, 12, 59, tzinfo=dt.timezone.utc), ], ) @@ -352,3 +357,116 @@ def test_max(self, mock_regularize, mock_haggregate): self.aggregation.method = "max" self.aggregation.process_timeseries() self.assertEqual(mock_regularize.call_args.kwargs["mode"], RM.INTERVAL) + + +@override_settings( + CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}} +) +class AggregationRecalculatesLastValueIfNeededTestCase(TestCase): + """ + This test case uses this source time series: + 2019-05-21 17:00:00+02:00 0.0 + 2019-05-21 17:10:00+02:00 1.0 + 2019-05-21 17:20:00+02:00 2.0 + 2019-05-21 17:30:00+02:00 3.0 + 2019-05-21 17:40:00+02:00 4.0 + 2019-05-21 17:50:00+02:00 5.0 + 2019-05-21 18:00:00+02:00 6.0 + 2019-05-21 18:10:00+02:00 7.0 + 2019-05-21 18:20:00+02:00 8.0 + 2019-05-21 18:30:00+02:00 9.0 + 2019-05-21 18:40:00+02:00 10.0 + + It makes aggregation to hourly, and the last aggregated record (19:00) has two + missing values in the source time series, and therefore the MISSING2 flag. + Subsequently these two records are added: + 2019-05-21 18:50:00+02:00 11.0 + 2019-05-21 19:00:00+02:00 12.0 + + Then aggregation is repeated, and it is verified that the aggregated record at + 19:00 is recalculated as needed. + """ + + def setUp(self): + station = baker.make(Station, name="Hobbiton", display_timezone="Etc/GMT-2") + timeseries_group = baker.make( + TimeseriesGroup, + gentity=station, + variable__descr="h", + precision=0, + ) + source_timeseries = baker.make( + Timeseries, + timeseries_group=timeseries_group, + type=Timeseries.CHECKED, + time_step="10min", + ) + start_date = dt.datetime(2019, 5, 21, 17, 0, tzinfo=get_tzinfo("Etc/GMT-2")) + index = [start_date + dt.timedelta(minutes=i) for i in range(0, 110, 10)] + values = [float(x) for x in range(11)] + flags = [""] * 11 + source_timeseries.set_data( + pd.DataFrame( + data={"value": values, "flags": flags}, + columns=["value", "flags"], + index=index, + ) + ) + aggregation = Aggregation( + timeseries_group=timeseries_group, + target_time_step="1h", + method="sum", + max_missing=2, + resulting_timestamp_offset="", + ) + super(AutoProcess, aggregation).save() # Avoid triggering a celery task + self.aggregation_id = aggregation.id + + def test_initial_target_timeseries(self): + aggregation = Aggregation.objects.get(id=self.aggregation_id) + aggregation.execute() + actual_data = aggregation.target_timeseries.get_data().data + expected_data = pd.DataFrame( + data={"value": [21.0, 34.0], "flags": ["", "MISSING2"]}, + columns=["value", "flags"], + index=[ + dt.datetime(2019, 5, 21, 18, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + dt.datetime(2019, 5, 21, 19, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + ], + ) + expected_data.index.name = "date" + pd.testing.assert_frame_equal(actual_data, expected_data) + + def test_updated_target_timeseries(self): + Aggregation.objects.get(id=self.aggregation_id).execute() + + self._extend_source_timeseries() + aggregation = Aggregation.objects.get(id=self.aggregation_id) + aggregation.execute() + + ahtimeseries = aggregation.target_timeseries.get_data() + expected_data = pd.DataFrame( + data={"value": [21.0, 57.0], "flags": ["", ""]}, + columns=["value", "flags"], + index=[ + dt.datetime(2019, 5, 21, 18, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + dt.datetime(2019, 5, 21, 19, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + ], + ) + expected_data.index.name = "date" + pd.testing.assert_frame_equal(ahtimeseries.data, expected_data) + + def _extend_source_timeseries(self): + aggregation = Aggregation.objects.get(id=self.aggregation_id) + source_timeseries = aggregation.source_timeseries + new_values = [11.0, 12.0] + new_flags = ["", ""] + end_date = source_timeseries.end_date + delta = dt.timedelta + new_dates = [end_date + delta(minutes=10), end_date + delta(minutes=20)] + new_data = pd.DataFrame( + data={"value": new_values, "flags": new_flags}, + columns=["value", "flags"], + index=new_dates, + ) + source_timeseries.append_data(new_data) diff --git a/enhydris/models/timeseries.py b/enhydris/models/timeseries.py index d2103dfe..d637eece 100644 --- a/enhydris/models/timeseries.py +++ b/enhydris/models/timeseries.py @@ -348,13 +348,14 @@ def save(self, force_insert=False, force_update=False, *args, **kwargs): class TimeseriesRecord(models.Model): - # Ugly primary key hack. + # Ugly primary key hack - FIX ME in Django 5.2. # Django does not allow composite primary keys, whereas timescaledb can't work # without them. Our composite primary key in this case is (timeseries, timestamp). # What we do is set managed=False, so that Django won't create the table itself; # we create it with migrations.RunSQL(). We also set "primary_key=True" in one of # the fields. While technically this is wrong, it fools Django into not expecting - # an "id" field to exist, and it doesn't affect querying functionality. + # an "id" field to exist, and it doesn't affect querying functionality (except in + # one case in autoprocess.models.Aggregation._get_start_date(), see comment there). timeseries = models.ForeignKey(Timeseries, on_delete=models.CASCADE) timestamp = models.DateTimeField(primary_key=True, verbose_name=_("Timestamp")) value = models.FloatField(blank=True, null=True, verbose_name=_("Value")) diff --git a/requirements.txt b/requirements.txt index 437d23df..404bc5e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,5 +16,5 @@ django-simple-captcha>=0.5.14,<1 django-bootstrap4>=2,<4 requests>=2.25,<3 defusedxml>=0.7.1,<1 -pthelma[all]>=2,<3 +pthelma[all]>=2.1,<3 matplotlib>=3,<3.7