Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Gap Filling on Time Series Data #4809

Open
wolffcm opened this issue Jan 3, 2023 · 21 comments
Open

Support Gap Filling on Time Series Data #4809

wolffcm opened this issue Jan 3, 2023 · 21 comments
Labels
enhancement New feature or request

Comments

@wolffcm
Copy link
Contributor

wolffcm commented Jan 3, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

A common use case when working with time series data is to compute an aggregate value for windows of time, e.g., every minute, hour, week or whatever. It is possible to do this with the DATE_BIN function in DataFusion. However, DATE_BIN will not produce any value for a window that did not contain any rows.

For example, for this input date:

time c0
2022-12-01 10
2022-12-03 30

We might run this query;

select
  date_bin(interval '1 day', time, timestamp '1970-01-01T00:00:00Z') as day,
  avg(c0) 
from t
group by day;

And we would get something like:

day avg
2022-12-01 10
2022-12-03 30

Generating a row in the output for 2022-12-02 is difficult to do with ANSI-SQL. Here is one attempt: Fill Gaps in Time Series with this simple trick in SQL. Having to write SQL like this for what is an intuitive and common use case is frustrating.

Describe the solution you'd like

It would be good to have a concise, idiomatic way to do this. Many vendors provide a solution for this problem. The have the following in common:

  • They provide a way to break up an interval of time into contiguous windows
  • They provide some kind of way to produce a value where there were no input rows

One such solution would be to use a function like TimeScale's functions time_bucket_gapfill and locf (last observation carried forward):
https://docs.timescale.com/api/latest/hyperfunctions/gapfilling-interpolation/time_bucket_gapfill/

The above query might be changed to this, using time_bucket_gapfill and locf:

select
  time_bucket_gapfill(interval '1 day', time, timestamp '1970-01-01T00:00:00Z') as day,
  avg(c0),
  locf(avg(c0))
from t
group by day;
day avg locf
2022-12-01 10 10
2022-12-02 10
2022-12-03 30 30

TimeScale also provides interpolate to populate a gap with an interpolated value (e.g., would put 20 in the gap for the example).

I've written up an approach to this work here:
https://docs.google.com/document/d/1vIcs9uhlCX_AkD9bemcDx-YhBOVe_TW5sBbXtKCHIfk/edit?usp=sharing

Initially we (InfluxData) were going to implement this in IOx directly, but seems like it could be worthy of upstreaming into DataFusion.

Describe alternatives you've considered

Postgres provides a general purpose way to generate data:
https://www.postgresql.org/docs/9.1/functions-srf.html#FUNCTIONS-SRF-SERIES
But this seems like it would be more difficult to use than something like time_bucket_gapfill.

@wolffcm wolffcm added the enhancement New feature or request label Jan 3, 2023
@alamb
Copy link
Contributor

alamb commented Jan 4, 2023

@waynexia @jiacai2050 (CeresDB) @v0y4g3r (GrepTime) @gruuya (SeaFowl) As I believe you are building other timeseries database systems on DataFusion, I wonder if you have any thoughts about adding such a feature to DataFusion?

We plan on building this feature natively in IOx but might be willing to upstream it as well if there is community interest

cc @waitingkuo @andygrove and @liukun4515 in case you have thoughts as well

@waynexia
Copy link
Member

waynexia commented Jan 5, 2023

I wonder if you have any thoughts about adding such a feature to DataFusion?

It looks good to have time_bucket_gapfill and locf natively in DataFusion to me. Selecting and postprocessing the selected data in TSDB is a bit complex compared to general DBMS. Prometheus/PromQL also have this kind of logic, which will try to align and lookback the "series":
https://promlabs.com/blog/2020/07/02/selecting-data-in-promql#lookback-delta

I build something similar in the very recent: https://github.com/GreptimeTeam/greptimedb/blob/develop/src/promql/src/extension_plan/instant_manipulate.rs#L423

And we plan to expose this functionality to SQL interface in some way, which will become something similar to this proposal I think.

But my concern is, to provide a good use experience and functionality, we may need a bunch of "gap-filling" functions. Like filling it with null, filling it with the last value, filling it with the last value if the gap is less than 1 day otherwise left blank etc. I'm not sure if these "time-series functions" is also useful to other users of DataFusion (but it might be fine as PostgreSQL also provides such utils).

@jiacai2050
Copy link
Contributor

jiacai2050 commented Jan 5, 2023

@alamb Thanks for bring me in.

I have one concern when implement this feature in datafusion.

Suppose time range of one query is [2022-12-02, 2022-12-04], and the real dataset is

  • 2022-12-01, 10
  • 2022-12-03, 20
  • 2022-12-04, 30

Then what is the result of locf(avg(c0) for 2022-12-02, None or 10?

In Prometheus, it will be 10 when lookback-delta is 1d, and this require to rewrite time range of the query to [start-delta, end], and trim data to [start, end] after fill in gaps.

I don't know how timescale deal with this case, IMO rewrite time query of one query may not suitable for datafusion since it's a generic SQL engine.

Any ideas about this first value issue?

@ozankabak
Copy link
Contributor

We are interested in this too. Are you aware of any approaches other than the two we have so far (time_bucket_gapfill and generate_series)? Any planned features related to this in recent and/or upcoming SQL standards and whatnot? The only possible pitfall I can see is not doing our homework before choosing an approach and running with it -- other than that this would be a great feature to have.

@alamb
Copy link
Contributor

alamb commented Jan 5, 2023

@ozankabak

Are you aware of any approaches other than the two we have so far (time_bucket_gapfill and generate_series)?

I don't know if any upcoming SQL standard for this but I didn't look hard at it either.

This use case is common. It is often called bucketing with "gap filling" or "interpolation" in other SQL implementations. This type of query is not easy to express in ANSI-SQL and thus databases often offer some sort of SQL extension.

Here are some example extensions I found:

All of these extensions have two main features:

  • Some way to define regular windows of time across a contiguous interval of time, even if there are no time values in that windows
  • An interpolation policy of what value to use when (e.g. previous value, NULL, etc) there is no data for the window in the database.

@alamb
Copy link
Contributor

alamb commented Jan 5, 2023

@waynexia

But my concern is, to provide a good use experience and functionality, we may need a bunch of "gap-filling" functions. Like filling it with null, filling it with the last value, filling it with the last value if the gap is less than 1 day otherwise left blank etc. I'm not sure if these "time-series functions" is also useful to other users of DataFusion (but it might be fine as PostgreSQL also provides such utils).

I agree specifying the interpolation policy / gap filling is important.

In addition to the simple https://docs.timescale.com/api/latest/hyperfunctions/gapfilling-interpolation/locf/ function they have interpolate and one of the examples shows how to do some complicated lookup that appear similar to what you are suggesting

https://docs.timescale.com/api/latest/hyperfunctions/gapfilling-interpolation/interpolate/

I wonder if that would be sufficient 🤔

@alamb
Copy link
Contributor

alamb commented Jan 5, 2023

@jiacai2050

Support time range of one query is [2022-12-02, 2022-12-04], and the real dataset is

2022-12-01, 10
2022-12-03, 20
2022-12-04, 30

Then what is the result of locf(avg(c0) for 2022-12-02, None or 10?

I am not sure

I don't know how timescale deal with this case, IMO rewrite time query of one query may not suitable for datafusion since it's a generic SQL engine.

It may well be the case that this is something that is not easy / reasonable to express in SQL (

Any ideas about this first value issue?

The timebucket_gap_fill function (docs) can take an optional start and finish arguments which perhaps offers a way to express this case (apply start/finish filters after the query?)

Another way I could imagine is to run a subquery that has the full range [2022-12-01, 2022-12-04] with timebucket_gap_fill and then apply a filter in an outer query to restrict the data to [2022-12-02, 2022-12-04]

@waynexia
Copy link
Member

waynexia commented Jan 5, 2023

I wonder if that would be sufficient thinking

Thanks for those links, interpolate looks amazing and powerful, allowing a sub-select as the argument can accomplish many requirements 👍 It would be nice if we support this function.

@wolffcm
Copy link
Contributor Author

wolffcm commented Jan 5, 2023

Here is a design document to do this work in DataFusion:
https://docs.google.com/document/d/1vIcs9uhlCX_AkD9bemcDx-YhBOVe_TW5sBbXtKCHIfk/edit?usp=sharing

Initially I wrote this design thinking it would go into IOx, but since there is some interest here, I have adapted it a bit for DataFusion. Feedback on the approach would be appreciated!

@ozankabak
Copy link
Contributor

I would like to think a little bit on how one can do this with multi-row window functions (i.e. window functions that may generate multiple rows for every frame). It seems the syntax and semantics of such an approach would be more in-line with standard SQL's treatment of windowing (which would be good from a POLA perspective). I will share my thoughts in a few days when things mature a little bit.

@liukun4515
Copy link
Contributor

liukun4515 commented Jan 7, 2023

@waynexia @jiacai2050 (CeresDB) @v0y4g3r (GrepTime) @gruuya (SeaFowl) As I believe you are building other timeseries database systems on DataFusion, I wonder if you have any thoughts about adding such a feature to DataFusion?

We plan on building this feature natively in IOx but might be willing to upstream it as well if there is community interest

cc @waitingkuo @andygrove and @liukun4515 in case you have thoughts as well

If the SQL and usage is compatible with the PG SQL syntax, I think it can be added in datafusion easily.
If the the SQL syntax and usage is not compatible with PG or other RDBMS, I think we should add the feature in the corresponding lib or project instead of datafusion.
It will breaks the SQL standards.

@liukun4515
Copy link
Contributor

Here is a design document to do this work in DataFusion: https://docs.google.com/document/d/1vIcs9uhlCX_AkD9bemcDx-YhBOVe_TW5sBbXtKCHIfk/edit?usp=sharing

Initially I wrote this design thinking it would go into IOx, but since there is some interest here, I have adapted it a bit for DataFusion. Feedback on the approach would be appreciated!

Thank @wolffcm .
I need more time to take look your documents.

@jiacai2050
Copy link
Contributor

jiacai2050 commented Jan 10, 2023

The timebucket_gap_fill function (docs) can take an optional start and finish arguments which perhaps offers a way to express this case (apply start/finish filters after the query?)

I'm afraid this doesn't work, timescale docs says

start and finish arguments do not filter input rows.

start/finish works after scan data(using where), if fetched data contains no value of 2022-12-01, then we won't get it in upper plan node.

I did following tests against timescale:

CREATE TABLE stocks_real_time (
  time TIMESTAMPTZ NOT NULL,
  price DOUBLE PRECISION NULL
);

SELECT create_hypertable('stocks_real_time','time');

insert into stocks_real_time values
       ('2022-10-01', 10),
       ('2022-10-03', 30),
       ('2022-10-04', 40),
       ('2022-10-05', 50);

SELECT
  time_bucket_gapfill('1 day', time, timestamp '2022-09-30', timestamp '2022-10-10') AS day,
  avg(price) AS value,
  locf(avg(price)),
  interpolate(avg(price))
FROM stocks_real_time
WHERE time > '2022-10-02' AND time < '2022-10-05'
GROUP BY day
ORDER BY day;

It will output

| day                    | value | locf | interpolate |
|------------------------+-------+------+-------------|
| 2022-09-30 00:00:00+00 |       |      |             |
| 2022-10-01 00:00:00+00 |       |      |             |
| 2022-10-02 00:00:00+00 |       |      |             |
| 2022-10-03 00:00:00+00 |    30 |   30 |          30 |
| 2022-10-04 00:00:00+00 |    40 |   40 |          40 |
| 2022-10-05 00:00:00+00 |       |   40 |             |
| 2022-10-06 00:00:00+00 |       |   40 |             |
| 2022-10-07 00:00:00+00 |       |   40 |             |
| 2022-10-08 00:00:00+00 |       |   40 |             |
| 2022-10-09 00:00:00+00 |       |   40 |             |

Another way I could imagine is to run a subquery that has the full range [2022-12-01, 2022-12-04] with timebucket_gap_fill and then apply a filter in an outer query to restrict the data to [2022-12-02, 2022-12-04]

Subquery seems unnecessary, if time range in time_bucket_gapfill different with range in where clause, maybe we can overwrite where clause, and filter data in GapFill plan node, something like this(adopted from google docs above):

Projection: datebin(...) AS day, locf
  GapFill: groupBy=[[datebin_gapfill(..)]], aggr=[[locf(avg(price)) as locf]], original_time=(2022-10-02, 2022-10-05)
    Sort: cpu ASC NULLS LAST, datebin(...) ASC NULLS LAST
      Aggregate: groupBy=[[datebin(...) AS datebin(...)]], aggr=[]
        TableScan: stocks_real_time projection=[time, price]   -- time range is rewritten as (2022-09-30, 2022-10-10)

@wolffcm @alamb Make sense?

@wolffcm
Copy link
Contributor Author

wolffcm commented Jan 11, 2023

@jiacai2050

Subquery seems unnecessary, if time range in time_bucket_gapfill different with range in where clause, maybe we can overwrite where clause, and filter data in GapFill plan node, something like this(adopted from google docs above):

I understand what you're suggesting, but I worry that rewriting a filter like that would have unforeseen effects that are difficult to understand. For example, if the input to Aggregate was not a simple scan or filter, but instead the output of a derived table or a join, it could be hard to do a rewrite. What would the behavior be for that case?

I think this problem is a really tricky one. In the TImeScale docs for locf() there is a prev parameter which solves this problem. It is basically a subquery. It's a little awkward to have to type it but has the advantage of not requiring rewriting other parts of the plan.

  locf(
    avg(temperature),
    (SELECT temperature FROM metrics m2 WHERE m2.time < now() - INTERVAL '2 week' AND m.device_id = m2.device_id ORDER BY time DESC LIMIT 1)
  )

I'm curious about what you think of that approach.

@jiacai2050
Copy link
Contributor

It seems subquery is more flexible and "safe" than rewrite where clause.

One more question: if the subquery return multiple values, which value will be chosen by locf?

@liukun4515
Copy link
Contributor

But my concern is, to provide a good use experience and functionality, we may need a bunch of "gap-filling" functions. Like filling it with null, filling it with the last value, filling it with the last value if the gap is less than 1 day otherwise left blank etc.

I'm not sure if these "time-series functions" is also useful to other users of DataFusion (but it might be fine as PostgreSQL also provides such utils).

@alamb
I also have this concern, some functions specified in time-series maybe not compatible with PG, and they maybe not useful to other user who use datafusion just as the PG.

@wolffcm
Copy link
Contributor Author

wolffcm commented Jan 11, 2023

@jiacai2050

One more question: if the subquery return multiple values, which value will be chosen by locf?

I just tried this in TimeScale. if more than one value is returned, it returns an error more than one row returned by a subquery used as an expression similar to other situations where a subquery is expected to return a scalar.

@ozankabak
Copy link
Contributor

I agree with @liukun4515 that unless we come up with something that deviates very little from standard SQL (and/or PG), it may be prudent to think on this and maybe leave it to other packages if we can't find a way.

I am not hopeless BTW -- I think there are ways to do this in a very much standard-like way, I just haven't had the time to look into it.

@alamb
Copy link
Contributor

alamb commented Jan 11, 2023

I think in terms of IOx we are happy to do it downstream in IOx via the existing DataFusion extension points as well -- I think it would help @wolffcm to know which way we are leaning we can avoid too much rework

@jirislav
Copy link

This issue didn't move at all for 1.5 year, what's the status here? I see that IOx has implemented this as an UDF, but it seems to me like the standard SQL (and/or PG) is taken too seriously within DataFusion and is inherently limiting the adoptability within time-series applications, such as finance or IoT, both quite big and growing industries.

I wouldn't suggest this if IOx stayed open-source, but since it is not anymore, couldn't it be supported at least through some kind of feature flag, something like SETTINGS enable_non_sql_standard_time_series_features = 1 similarly as ClickHouse does it?

@alamb
Copy link
Contributor

alamb commented Jun 27, 2024

This issue didn't move at all for 1.5 year, what's the status here?

I don't think there is any new status to report from my perspective

I wouldn't suggest this if IOx stayed open-source, but since it is not anymore,

Our implementation's source is currently in influxdb3_core: https://github.com/influxdata/influxdb3_core/blob/0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94/query_functions/src/lib.rs#L33 in case anyone cares

FWIW "soon" InfluxData plans an open source offering based on the 3.0 architecture (aka IOx) but I don't have any additional specific details to share there

couldn't it be supported at least through some kind of feature flag, something like SETTINGS enable_non_sql_standard_time_series_features = 1 similarly as ClickHouse does it?

I think that would be possible

To improve timeseries support in DataFusion itself, I think working on ASOF join might be a good first step as that is more "standard" perhaps #318

Note there are a bunch of interesting timeseries optimizations such as #10316 and #10313 that could be added

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants