Skip to content

Commit

Permalink
Update expiration design
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Oct 22, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent de0d5dd commit 5139cf9
Showing 1 changed file with 75 additions and 4 deletions.
79 changes: 75 additions & 4 deletions doc/developer/design/20240919_dataflow_expiration.md
Original file line number Diff line number Diff line change
@@ -7,7 +7,9 @@
Temporal filters currently require Materialize to maintain all future retractions
of data that is currently visible. For long windows, the retractions could be at
timestamps beyond our next scheduled restart, which is typically our weekly
DB release.
DB release. Materialize needs to revisit the data at every tick, which causes it
to spend CPU time linear in the number of outstanding updates. This is prohibitively
expensive to maintain.

For instance, in the example below, the temporal filter in the `last_30_days`
view causes two diffs to be generated for every row inserted into `events`, the
@@ -40,13 +42,57 @@ such retractions can be dropped.

## Success Criteria

The observed output of Materialize with expiration enabled is indistinguishable
from expiration disabled.

When temporal filters are in use, retraction diffs associated with timestamps
beyond a set expiration time can be dropped without affecting correctness,
resulting in lower memory and CPU utilization from halving the number of
processed diffs.

## Solution Proposal

We define an _expiration offset_, which allows a replica to determine which
data cannot be revealed to the user while maintaining correctness invariants. A
implementation is correct iff it produces exactly the same data with or without
replica expiration enabled or disabled.

Objects in Materialize exist in a specific timeline. For this effort, we only
focus on objects in the epoch timeline, but exclude all others. This will cover
the majority of objects.

At any point in time, a collection has a lower and an upper bound of times that
we can distinguish, forming a range of data that we can surface to the user. As
time advances, the lower and upper bounds can advance, too. Inserting new data
moves the upper bound, compacting old data moves the lower bound. Relating the
wall-clock time to future frontiers allows us to specify the semantics for
expiring data.

Specifically, we're interested to determine how far the upper frontier of a
collection will advance at the point of expiration. We can drop all data past
the upper frontier at the time of expiration, but never any data before.
* Sources and tables tick forward in relation to the system's wall-clock time.
They never jump forward unless dropped.
* Load generators tick forward in accordance with their implementation,
allowing all valid frontier movements.
* Indexes, tick with respect to their inputs.
* Materialized views tick with respect to their inputs, and adhere to a refresh
schedule.
* Subscribes tick forward with respect to their inputs up to a user-defined
time.
* Selects query a single point in time, and thus are not affected by this
feature.
* Constant collections are valid from the beginning to the end of time.

An object that depends on multiple inputs ticks forward at the rate of the
slowest input, i.e., the meet of all its input's uppers.

The expiration offset is a global setting. When creating a new replica, we
capture the value and never update it later. Replicas convert the offset into
an absolute time stamp depending on their wall-clock time. When rendering a
dataflow, each replica determines an appropriate expiration time based on the
dataflow's upper at the time of expiration.

A new LaunchDarkly feature flag is introduced that specifies an _expiration
offset_ (a `Duration`). The _replica expiration_ time is computed as the offset
added to the start time of the replica. Dataflows matching certain
@@ -56,6 +102,34 @@ the dataflow expiration are dropped. To ensure correctness, panic checks are
added to these dataflows that ensure that the frontier does not exceed the
dataflow expiration before the replica is restarted.

Environment and replica only have partial information on the per-dataflow
expiration time. The environment knows the dependency tree and properties per
object. Only the replica knows the local expiration time. Thus, a dataflow
description needs to encode enough breadcrumbs for the replica to make a
correct decision on if and when to expire a dataflow.

**An object is _definite up to_ a frontier as a function of an expiration
time:**
* Sources are definite up to the expiration time.
* Load generators are definit up to the minimum frontier.
* Indexes are definite up to the meet of their inputs.
* Constant collections are definite up to the empty frontier.
* Tables are definite up to the expiration time.
* Materialized views are definite up to the meet of their inputs, rounded up to
the refresh schedule.
* Subscribes are definite up to the meet of their up-to and their inputs.

### Distinguishing expiration from dataflow shutdown

When a dataflow shuts down, it frontier advances to the empty frontier. The
persist source assumes that once the input frontiers reach the `until`, it is
free to shut down the dataflow. This is correct if the until is specified by
the user, but incorrect if the until is set to expire future updates.

TODO:
* Checking frontiers at the input vs. output.
* Do we need a token to distinguish shutdown variants? Ideally, we know from the structure of the dataflow and objects it depends on what advancing to the empty frontier means.

An overview of the logic used for these features is as follows:
```
# Consider the `upper` for different dataflows
@@ -128,6 +202,3 @@ Dataflow expiration is disabled for the following cases:

- Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the
frontier timestamp being comparable to wall clock time of the replica.
- Dataflows that transitively depend on a materialized view with a non-default
refresh schedule. Handling such materialized views would require additional
logic to track the refresh schedules and ensure that the dataflow expiration

0 comments on commit 5139cf9

Please sign in to comment.