Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Incremental Model Improvements #10624

Open
13 of 22 tasks
QMalcolm opened this issue Aug 28, 2024 · 3 comments
Open
13 of 22 tasks

[EPIC] Incremental Model Improvements #10624

QMalcolm opened this issue Aug 28, 2024 · 3 comments
Milestone

Comments

@QMalcolm
Copy link
Contributor

QMalcolm commented Aug 28, 2024

Incremental models in dbt is a materialization strategy designed to efficiently update your data warehouse tables by only transforming and loading new or changed data since the last run. Instead of processing your entire dataset every time, incremental models append or update only the new rows, significantly reducing the time and resources required for your data transformations.

Even with all the benefits of incremental models as they exist today, there are limitations with this approach, such as:

  • burden is on YOU to calculate what’s “new” - what has already been loaded, what needs to be loaded, etc.
  • can be slow if you have many partitions to process (like when running in full-refresh mode) as it’s done in “one big” SQL statement - can time out, if it fails you end up needing to retry already successful partitions, etc.
  • if you want to specifically name a partition for your incremental model to process, you have to add additional “hack”y logic, likely using vars
  • data tests run on your entire model, rather than just the "new" data

In this project we're aiming to make incremental models easier to implement and more efficient to run.

P0s

  1. enhancement
    MichelleArk QMalcolm
  2. MichelleArk QMalcolm
  3. MichelleArk
  4. enhancement
    MichelleArk QMalcolm
  5. enhancement
    MichelleArk QMalcolm
  6. enhancement
    MichelleArk QMalcolm
  7. enhancement
    MichelleArk QMalcolm
  8. enhancement
    MichelleArk QMalcolm
  9. enhancement
    MichelleArk QMalcolm
  10. MichelleArk
  11. MichelleArk QMalcolm
  12. MichelleArk
  13. MichelleArk
  14. QMalcolm
  15. QMalcolm
  16. QMalcolm
  17. MichelleArk

P1s

P2s

@MaartenN1234
Copy link

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?

I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@QMalcolm
Copy link
Contributor Author

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?

I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@MaartenN1234 I'm not sure that I fully understand the question being asked. For my clarity, is the question whether this new functionality will support more than one input to an incremental model? If so, the answer is yes!

For example, say we turn the jaffle-shop customers model into an incremental microbatch model. It'd look like the following

{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='created_at', batch_size='day') }}

with

customers as (
    select * from {{ ref('stg_customers') }}
),

orders as (
    select * from {{ ref('orders') }}
),

customer_orders_summary as (
    select
        orders.customer_id,
        count(distinct orders.order_id) as count_lifetime_orders,
        count(distinct orders.order_id) > 1 as is_repeat_buyer,
        min(orders.ordered_at) as first_ordered_at,
        max(orders.ordered_at) as last_ordered_at,
        sum(orders.subtotal) as lifetime_spend_pretax,
        sum(orders.tax_paid) as lifetime_tax_paid,
        sum(orders.order_total) as lifetime_spend
    from orders
    group by 1
),

joined as (
    select
        customers.*,
        customer_orders_summary.count_lifetime_orders,
        customer_orders_summary.first_ordered_at,
        customer_orders_summary.last_ordered_at,
        customer_orders_summary.lifetime_spend_pretax,
        customer_orders_summary.lifetime_tax_paid,
        customer_orders_summary.lifetime_spend,
        case
            when customer_orders_summary.is_repeat_buyer then 'returning'
            else 'new'
        end as customer_type
    from customers

    left join customer_orders_summary
        on customers.customer_id = customer_orders_summary.customer_id
)

select * from joined

If the models orders and stg_customers both have an event_time defined (they don't need to be incremental themselves), then they will automatically be filtered and batched by the generated event time filters.

@MaartenN1234
Copy link

MaartenN1234 commented Sep 17, 2024

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?
I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@MaartenN1234 I'm not sure that I fully understand the question being asked. For my clarity, is the question whether this new functionality will support more than one input to an incremental model? If so, the answer is yes!

For example, say we turn the jaffle-shop customers model into an incremental microbatch model. It'd look like the following

{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='created_at', batch_size='day') }}

with

customers as (
    select * from {{ ref('stg_customers') }}
),

orders as (
    select * from {{ ref('orders') }}
),

customer_orders_summary as (
    select
        orders.customer_id,
        count(distinct orders.order_id) as count_lifetime_orders,
        count(distinct orders.order_id) > 1 as is_repeat_buyer,
        min(orders.ordered_at) as first_ordered_at,
        max(orders.ordered_at) as last_ordered_at,
        sum(orders.subtotal) as lifetime_spend_pretax,
        sum(orders.tax_paid) as lifetime_tax_paid,
        sum(orders.order_total) as lifetime_spend
    from orders
    group by 1
),

joined as (
    select
        customers.*,
        customer_orders_summary.count_lifetime_orders,
        customer_orders_summary.first_ordered_at,
        customer_orders_summary.last_ordered_at,
        customer_orders_summary.lifetime_spend_pretax,
        customer_orders_summary.lifetime_tax_paid,
        customer_orders_summary.lifetime_spend,
        case
            when customer_orders_summary.is_repeat_buyer then 'returning'
            else 'new'
        end as customer_type
    from customers

    left join customer_orders_summary
        on customers.customer_id = customer_orders_summary.customer_id
)

select * from joined

If the models orders and stg_customers both have an event_time defined (they don't need to be incremental themselves), then they will automatically be filtered and batched by the generated event time filters.

The critical requirement for me, is that matching rows (on the join condition) in both sources are not neccesarily created in the same batch. So when the filter is on the sources independently:
select * from {{ ref('stg_customers') }} where event_time > last_processed_event_time
and
select * from {{ ref('orders') }} where event_time > last_processed_event_time

stuff will be wrong (e.g. if we would load one more order, we would loose all previous from the aggregate or when the customer data is updated while no new orders for this client are to be processed the update will not be propagated).

To get it right, it should become somewhat like this:
select * from {{ ref('stg_customers') }} where event_time > last_processed_event_time or (customer_id IN ( select customer_id from {{ ref('orders') }} where event_time > last_processed_event_time))
and
select * from {{ ref('orders') }} where (customer_id IN ( select customer_id from {{ ref('stg_customers') }} where event_time > last_processed_event_time UNION ALL select customer_id from {{ ref('orders') }} where event_time > last_processed_event_time))

So one needs to incorporate the join clause and the aggregation into the change detection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants