diff --git a/dbt_subprojects/tokens/models/prices_v2/_schema.yml b/dbt_subprojects/tokens/models/prices_v2/_schema.yml new file mode 100644 index 00000000000..e1c1fe5f935 --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/_schema.yml @@ -0,0 +1,50 @@ +version: 2 + +models: + - name: prices_v2_dex_minute_raw + meta: + sector: prices + contributors: 0xRob + description: "sparse minute-level prices sourced from dex.trades, not filtered" + data_tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - blockchain + - contract_address + - timestamp + + - name: prices_v2_coinpaprika_minute + meta: + sector: prices + contributors: 0xRob + description: "sparse minute-level prices from coinpaprika (only trusted tokens)" + data_tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - blockchain + - contract_address + - timestamp + + - name: prices_v2_minute_sparse + meta: + sector: prices + contributors: 0xRob + description: "sparse minute-level prices from all sources" + data_tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - blockchain + - contract_address + - timestamp + + - name: prices_v2_day_sparse + meta: + sector: prices + contributors: 0xRob + description: "sparse day-level prices from all sources" + data_tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - blockchain + - contract_address + - timestamp diff --git a/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_coinpaprika_minute.sql b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_coinpaprika_minute.sql new file mode 100644 index 00000000000..4e2a1ae711f --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_coinpaprika_minute.sql @@ -0,0 +1,30 @@ +{{ config( + schema = 'prices_v2' + , alias = 'coinpaprika_minute' + , materialized = 'incremental' + , file_format = 'delta' + , partition_by = ['date'] + , incremental_strategy = 'merge' + , unique_key = ['blockchain', 'contract_address', 'timestamp'] + , incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')] + ) +}} + +select + ptt.blockchain + , ptt.contract_address + , p.minute as timestamp + , p.price + , cast(null as double) as volume + , 'coinpaprika' as source + , date_trunc('day', p.minute) as date --partition +from + {{ source('prices','usd_0003') }} as p -- todo: fix this source +inner join + {{ ref('prices_trusted_tokens') }} as ptt + on p.token_id = ptt.token_id +where + 1=1 + {% if is_incremental() %} + and {{ incremental_predicate('p.minute') }} + {% endif %} diff --git a/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_filter.sql b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_filter.sql new file mode 100644 index 00000000000..c7e71747b73 --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_filter.sql @@ -0,0 +1,43 @@ +{{ config( + schema='prices_v2' + , alias = 'dex_filter' + , materialized = 'view' + ) +}} + +WITH dex_volume_over_10k as ( + select + blockchain + ,contract_address + from( + SELECT + d.blockchain, + d.token_bought_address as contract_address, + sum(d.amount_usd) as volume -- in USD + FROM {{ source('dex','trades') }} d + group by 1,2 + UNION ALL + SELECT + d.blockchain, + d.token_sold_address as contract_address, + sum(d.amount_usd) as volume -- in USD + FROM {{ source('dex','trades') }} d + group by 1,2 + ) + group by 1,2 + having sum(volume) >= 10000 +) +, manual_filter as ( + SELECT + blockchain, + contract_address + FROM ( + VALUES + ('ethereum', 0x4c9EDD5852cd905f086C759E8383e09bff1E68B3) -- USDe has bad events (ex https://etherscan.io/tx/0x0c9464ff4fea893667a43e96e830073031f5587d8f3b33fb27a8464979f12897#eventlog#151) + ) as t(blockchain, contract_address) +) + +select * +from dex_volume_over_10k +where (blockchain, contract_address) + not in (select blockchain, contract_address from manual_filter) diff --git a/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_minute.sql b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_minute.sql new file mode 100644 index 00000000000..b354c312f18 --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_minute.sql @@ -0,0 +1,11 @@ +{{ config( + schema='prices_v2' + , alias = 'dex_minute' + , materialized = 'view' + ) +}} + +SELECT +* +FROM {{ ref('prices_v2_dex_minute_raw') }} +INNER JOIN {{ ref('prices_v2_dex_filter') }} using (blockchain, contract_address) diff --git a/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_minute_raw.sql b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_minute_raw.sql new file mode 100644 index 00000000000..f9466fbe0fb --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/minute_inputs/prices_v2_dex_minute_raw.sql @@ -0,0 +1,65 @@ +{{ config( + schema='prices_v2' + , alias = 'dex_minute_raw' + , materialized = 'incremental' + , file_format = 'delta' + , partition_by = ['date'] + , incremental_strategy = 'merge' + , unique_key = ['blockchain', 'contract_address', 'timestamp'] + , incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')] + ) +}} + +WITH dex_trades_filter_and_unnest as ( + SELECT + d.blockchain, + d.token_bought_address as contract_address, + d.block_time as timestamp, + d.amount_usd/d.token_bought_amount as price, + d.amount_usd as volume -- in USD + FROM {{ source('dex','trades') }} d + INNER JOIN {{ref('prices_trusted_tokens')}} t + on t.blockchain = d.blockchain + and t.contract_address = d.token_sold_address -- the token traded against is trusted + LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t + on anti_t.blockchain = d.blockchain + and anti_t.contract_address = d.token_bought_address -- the subjected token is already in trusted + WHERE d.amount_usd > 0 and token_bought_amount > 0 and token_bought_address is not null + and anti_t.contract_address is null + {% if is_incremental() %} + AND {{ incremental_predicate('d.block_time') }} + {% endif %} + + UNION ALL + + SELECT + d.blockchain, + d.token_sold_address as contract_address, + d.block_time as timestamp, + d.amount_usd/d.token_sold_amount as price, + d.amount_usd as volume -- in USD + FROM {{ source('dex','trades') }} d + INNER JOIN {{ref('prices_trusted_tokens')}} t + on t.blockchain = d.blockchain + and t.contract_address = d.token_bought_address -- the token traded against is trusted + LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t + on anti_t.blockchain = d.blockchain + and anti_t.contract_address = d.token_sold_address -- the subjected token is already in trusted + WHERE d.amount_usd > 0 and token_sold_amount > 0 and token_sold_address is not null + and anti_t.contract_address is null + {% if is_incremental() %} + AND {{ incremental_predicate('d.block_time') }} + {% endif %} +) + + +SELECT + blockchain, + contract_address, + date_trunc('minute',timestamp) as timestamp, + approx_percentile(price,0.5) as price, -- median + sum(volume) as volume, + 'dex.trades' as source, + date_trunc('day',timestamp) as date -- partition +FROM dex_trades_filter_and_unnest +group by 1,2,3,7 diff --git a/dbt_subprojects/tokens/models/prices_v2/prices_v2_day.sql b/dbt_subprojects/tokens/models/prices_v2/prices_v2_day.sql new file mode 100644 index 00000000000..7aa82d2e096 --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/prices_v2_day.sql @@ -0,0 +1,81 @@ +{{ config( + schema='prices_v2', + alias = 'day', + tags = ['prod_exclude'], + materialized = 'incremental', + file_format = 'delta', + partition_by = ['date'], + incremental_strategy = 'merge', + unique_key = ['blockchain', 'contract_address', 'timestamp'], + incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')] + ) +}} + + +--WITH sparse_prices as ( +-- select +-- * +-- , lead(timestamp) over (partition by blockchain, contract_address order by timestamp asc) as next_update +-- from ( +-- select +-- blockchain +-- , contract_address +-- , timestamp +-- , price +-- , volume +-- , source +-- , date -- this is redundant here as date = timestamp, but we keep it in to be consistent across intervals +-- , source_timestamp +-- from {{ ref('prices_v2_day_sparse') }} +-- {% if is_incremental() %} +-- where {{ incremental_predicate('timestamp') }} +-- {% endif %} +-- -- If we're running incremental, we also need to add the last known prices from before the incremental window, to forward fill them +-- {% if is_incremental() %} +-- UNION ALL +-- SELECT * FROM ( +-- select +-- blockchain +-- , contract_address +-- , max(timestamp) -- we get the last updated price +-- , max_by(price,timestamp) as price +-- , max_by(volume,timestamp) as volume +-- , max_by(source,timestamp) as source +-- , max(date) as date +-- , max_by(source_timestamp,timestamp) as source_timestamp +-- from {{ ref('prices_v2_day_sparse') }} +-- where not {{ incremental_predicate('timestamp') }} -- not in the current incremental window (so before that) +-- group by blockchain, contract_address +-- ) +-- {% endif %} +-- ) +--) +-- +---- construct the time spline we want to fill +--, timeseries as ( +-- select timestamp +-- from unnest( +-- sequence(cast((select date_trunc('day', min(timestamp)) from sparse_prices) as timestamp) +-- , cast(date_trunc('day', now()) as timestamp) +-- , interval '1' day +-- ) +-- ) as foo(timestamp) +-- {% if is_incremental() %} +-- where {{ incremental_predicate('timestamp') }} -- reduce to the incremental window if running incrementally +-- {% endif %} +--) +-- +--SELECT +-- p.blockchain +-- , p.contract_address +-- , t.timestamp +-- , p.price +-- , p.volume +-- , p.source +-- , t.timestamp as date +-- , p.source_timestamp +--FROM timeseries t +--INNER JOIN sparse_prices p +-- on p.timestamp <= t.timestamp +-- and (p.next_update is null or p.next_update > t.timestamp) + diff --git a/dbt_subprojects/tokens/models/prices_v2/prices_v2_latest.sql b/dbt_subprojects/tokens/models/prices_v2/prices_v2_latest.sql new file mode 100644 index 00000000000..3a65637d4a9 --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/prices_v2_latest.sql @@ -0,0 +1,23 @@ +{{ config( + schema='prices_v2', + alias = 'latest', + materialized = 'incremental', + file_format = 'delta', + incremental_strategy = 'merge', + unique_key = ['blockchain', 'contract_address'] + ) +}} + + +SELECT + blockchain + , contract_address + , max(timestamp) as timestamp + , max_by(price,timestamp) as price + , max_by(volume,timestamp) as volume + , max_by(source,timestamp) as source +FROM {{ ref('prices_v2_minute_sparse') }} +{% if is_incremental() %} +WHERE {{ incremental_predicate('timestamp') }} +{% endif %} +GROUP BY 1,2 diff --git a/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_day_sparse.sql b/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_day_sparse.sql new file mode 100644 index 00000000000..dc43d39c3c3 --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_day_sparse.sql @@ -0,0 +1,27 @@ +{{ config( + schema='prices_v2', + alias = 'day_sparse', + materialized = 'incremental', + file_format = 'delta', + partition_by = ['date'], + incremental_strategy = 'merge', + unique_key = ['blockchain', 'contract_address', 'timestamp'], + incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')] + ) +}} + + +SELECT + blockchain + , contract_address + , date as timestamp + , max_by(price,timestamp) as price + , sum(volume) as volume + , max_by(source,timestamp) as source + , date + , max(timestamp) as source_timestamp +FROM {{ ref('prices_v2_minute_sparse') }} +{% if is_incremental() %} +WHERE {{ incremental_predicate('date') }} -- using date here makes sure we always process full days +{% endif %} +GROUP BY 1,2,3,7 diff --git a/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_hour_sparse.sql b/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_hour_sparse.sql new file mode 100644 index 00000000000..b99bf078759 --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_hour_sparse.sql @@ -0,0 +1,27 @@ +{{ config( + schema='prices_v2', + alias = 'hour_raw', + materialized = 'incremental', + file_format = 'delta', + partition_by = ['date'], + incremental_strategy = 'merge', + unique_key = ['blockchain', 'contract_address', 'timestamp'], + incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')] + ) +}} + + +SELECT + blockchain + , contract_address + , date_trunc('hour',timestamp) as timestamp + , max_by(price,timestamp) as price + , sum(volume) as volume + , max_by(source,timestamp) as source + , date + , max(timestamp) as source_timestamp +FROM {{ ref('prices_v2_minute_sparse') }} +{% if is_incremental() %} +WHERE {{ incremental_predicate("date_trunc('hour',timestamp)") }} -- this makes sure we always proces full hours +{% endif %} +GROUP BY 1,2,3,7 diff --git a/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_minute_sparse.sql b/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_minute_sparse.sql new file mode 100644 index 00000000000..912df7f721b --- /dev/null +++ b/dbt_subprojects/tokens/models/prices_v2/sparse_prices/prices_v2_minute_sparse.sql @@ -0,0 +1,41 @@ +{{ config( + schema='prices_v2', + alias = 'minute_sparse', + file_format = 'delta', + materialized = 'incremental', + partition_by = ['date'], + incremental_strategy = 'merge', + unique_key = ['blockchain', 'contract_address', 'timestamp'], + incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')] + ) +}} + +-- this model feeds into sqlmesh which performs a forward fill and aggregates upto higher timeframes + +{% set prices_models = [ + ref('prices_v2_dex_minute') + ,ref('prices_v2_coinpaprika_minute') +] %} + + +SELECT * +FROM +( + {% for model in prices_models %} + SELECT + blockchain + , contract_address + , timestamp + , price + , volume -- can be null + , source -- dex.trades/coinpaprika/.. + , date -- partition + FROM {{ model }} + {% if is_incremental() %} + WHERE {{ incremental_predicate('timestamp') }} + {% endif %} + {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} +)