Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Incremental Model Improvements - Microbatch #10624

Closed
59 of 70 tasks
QMalcolm opened this issue Aug 28, 2024 · 5 comments · Fixed by #10751
Closed
59 of 70 tasks

[EPIC] Incremental Model Improvements - Microbatch #10624

QMalcolm opened this issue Aug 28, 2024 · 5 comments · Fixed by #10751
Milestone

Comments

@QMalcolm
Copy link
Contributor

QMalcolm commented Aug 28, 2024

Incremental models in dbt is a materialization strategy designed to efficiently update your data warehouse tables by only transforming and loading new or changed data since the last run. Instead of processing your entire dataset every time, incremental models append or update only the new rows, significantly reducing the time and resources required for your data transformations.

Even with all the benefits of incremental models as they exist today, there are limitations with this approach, such as:

  • burden is on YOU to calculate what’s “new” - what has already been loaded, what needs to be loaded, etc.
  • can be slow if you have many partitions to process (like when running in full-refresh mode) as it’s done in “one big” SQL statement - can time out, if it fails you end up needing to retry already successful partitions, etc.
  • if you want to specifically name a partition for your incremental model to process, you have to add additional “hack”y logic, likely using vars
  • data tests run on your entire model, rather than just the "new" data

In this project we're aiming to make incremental models easier to implement and more efficient to run.

P0s - Core

P0s - Core Framework

Preview Give feedback
  1. enhancement
    MichelleArk QMalcolm
  2. MichelleArk QMalcolm
  3. enhancement
    MichelleArk QMalcolm
  4. enhancement
    MichelleArk QMalcolm
  5. enhancement
    MichelleArk QMalcolm
  6. enhancement
    MichelleArk QMalcolm
  7. enhancement
    MichelleArk QMalcolm
  8. enhancement
    MichelleArk QMalcolm
  9. MichelleArk
  10. MichelleArk QMalcolm
  11. MichelleArk
  12. MichelleArk
  13. QMalcolm
  14. QMalcolm
  15. QMalcolm
  16. QMalcolm
  17. MichelleArk
  18. MichelleArk
  19. QMalcolm
  20. QMalcolm
  21. user docs
    MichelleArk QMalcolm
  22. QMalcolm

P0s - Adapters

P0s - Adapters

Preview Give feedback
  1. MichelleArk
  2. MichelleArk
  3. MichelleArk
  4. MichelleArk
  5. QMalcolm
  6. feature:microbatch pkg:dbt-athena type:enhancement
    QMalcolm
  7. MichelleArk
  8. QMalcolm
  9. MichelleArk
  10. QMalcolm
  11. QMalcolm
  12. QMalcolm
  13. QMalcolm
  14. QMalcolm

Bugs

Beta bugs

Preview Give feedback
  1. bug microbatch user docs
    QMalcolm
  2. feature:incremental type:bug
    QMalcolm
  3. bug microbatch
    QMalcolm
  4. bug microbatch
    QMalcolm
  5. type:bug
    MichelleArk
  6. enhancement microbatch
    MichelleArk
  7. MichelleArk
  8. type:bug
    MichelleArk
  9. Impact: Exp bug incremental logging microbatch pre-release
    MichelleArk
  10. bug microbatch pre-release
    MichelleArk
  11. microbatch type:bug
    MichelleArk QMalcolm
  12. bug microbatch
    jtcohen6
  13. bug microbatch
    MichelleArk
  14. type:bug
    MichelleArk
  15. microbatch
  16. feature:microbatch pkg:dbt-postgres triage:product type:bug
  17. microbatch

P1s

P1s

Preview Give feedback
  1. QMalcolm
  2. QMalcolm
  3. enhancement microbatch
    QMalcolm
  4. enhancement microbatch
    MichelleArk QMalcolm
  5. enhancement microbatch
    QMalcolm
  6. feature:microbatch type:enhancement
    MichelleArk

P2s

@MaartenN1234
Copy link

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?

I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@QMalcolm
Copy link
Contributor Author

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?

I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@MaartenN1234 I'm not sure that I fully understand the question being asked. For my clarity, is the question whether this new functionality will support more than one input to an incremental model? If so, the answer is yes!

For example, say we turn the jaffle-shop customers model into an incremental microbatch model. It'd look like the following

{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='created_at', batch_size='day') }}

with

customers as (
    select * from {{ ref('stg_customers') }}
),

orders as (
    select * from {{ ref('orders') }}
),

customer_orders_summary as (
    select
        orders.customer_id,
        count(distinct orders.order_id) as count_lifetime_orders,
        count(distinct orders.order_id) > 1 as is_repeat_buyer,
        min(orders.ordered_at) as first_ordered_at,
        max(orders.ordered_at) as last_ordered_at,
        sum(orders.subtotal) as lifetime_spend_pretax,
        sum(orders.tax_paid) as lifetime_tax_paid,
        sum(orders.order_total) as lifetime_spend
    from orders
    group by 1
),

joined as (
    select
        customers.*,
        customer_orders_summary.count_lifetime_orders,
        customer_orders_summary.first_ordered_at,
        customer_orders_summary.last_ordered_at,
        customer_orders_summary.lifetime_spend_pretax,
        customer_orders_summary.lifetime_tax_paid,
        customer_orders_summary.lifetime_spend,
        case
            when customer_orders_summary.is_repeat_buyer then 'returning'
            else 'new'
        end as customer_type
    from customers

    left join customer_orders_summary
        on customers.customer_id = customer_orders_summary.customer_id
)

select * from joined

If the models orders and stg_customers both have an event_time defined (they don't need to be incremental themselves), then they will automatically be filtered and batched by the generated event time filters.

@MaartenN1234
Copy link

MaartenN1234 commented Sep 17, 2024

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?
I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@MaartenN1234 I'm not sure that I fully understand the question being asked. For my clarity, is the question whether this new functionality will support more than one input to an incremental model? If so, the answer is yes!

For example, say we turn the jaffle-shop customers model into an incremental microbatch model. It'd look like the following

{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='created_at', batch_size='day') }}

with

customers as (
    select * from {{ ref('stg_customers') }}
),

orders as (
    select * from {{ ref('orders') }}
),

customer_orders_summary as (
    select
        orders.customer_id,
        count(distinct orders.order_id) as count_lifetime_orders,
        count(distinct orders.order_id) > 1 as is_repeat_buyer,
        min(orders.ordered_at) as first_ordered_at,
        max(orders.ordered_at) as last_ordered_at,
        sum(orders.subtotal) as lifetime_spend_pretax,
        sum(orders.tax_paid) as lifetime_tax_paid,
        sum(orders.order_total) as lifetime_spend
    from orders
    group by 1
),

joined as (
    select
        customers.*,
        customer_orders_summary.count_lifetime_orders,
        customer_orders_summary.first_ordered_at,
        customer_orders_summary.last_ordered_at,
        customer_orders_summary.lifetime_spend_pretax,
        customer_orders_summary.lifetime_tax_paid,
        customer_orders_summary.lifetime_spend,
        case
            when customer_orders_summary.is_repeat_buyer then 'returning'
            else 'new'
        end as customer_type
    from customers

    left join customer_orders_summary
        on customers.customer_id = customer_orders_summary.customer_id
)

select * from joined

If the models orders and stg_customers both have an event_time defined (they don't need to be incremental themselves), then they will automatically be filtered and batched by the generated event time filters.

The critical requirement for me, is that matching rows (on the join condition) in both sources are not neccesarily created in the same batch. So when the filter is on the sources independently:
select * from {{ ref('stg_customers') }} where event_time > last_processed_event_time
and
select * from {{ ref('orders') }} where event_time > last_processed_event_time

stuff will be wrong (e.g. if we would load one more order, we would loose all previous from the aggregate or when the customer data is updated while no new orders for this client are to be processed the update will not be propagated).

To get it right, it should become somewhat like this:
select * from {{ ref('stg_customers') }} where event_time > last_processed_event_time or (customer_id IN ( select customer_id from {{ ref('orders') }} where event_time > last_processed_event_time))
and
select * from {{ ref('orders') }} where (customer_id IN ( select customer_id from {{ ref('stg_customers') }} where event_time > last_processed_event_time UNION ALL select customer_id from {{ ref('orders') }} where event_time > last_processed_event_time))

So one needs to incorporate the join clause and the aggregation into the change detection

@QMalcolm
Copy link
Contributor Author

QMalcolm commented Oct 3, 2024

Sorry for accidentally closing this as completed last week. As penance, here is a photo of my cat Misu. He is very excited about microbatch models

IMG_4238

@QMalcolm
Copy link
Contributor Author

QMalcolm commented Jan 7, 2025

We're gonna close this epic as microbatch feature has been generally released in 1.9.0. There are some follow-up issues, but the epic can still be closed

@QMalcolm QMalcolm closed this as completed Jan 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants