Skip to content

Commit

Permalink
DUX-823 Prices v2 rework (#7391)
Browse files Browse the repository at this point in the history
* initial v2 setup

* rename

* fix compile

* syntax

* fixxxeees

* moar fixeez

* moar fixeez

* fixezzz

* fixezzz

* fix col ref in unique

* left anti join

* add dex volume filter

* add missing select

* materialize end model

* add uniqueness tests

* fix test

* store test failures

* fix anti join syntax

* trigger CI

* test CI again

* partition by day

* remove partition

* partition by date

* try out minute mean and compare

* table

* more comparisons

* use dex source

* fix syntax

* syntax

* remove temp testing models

* syntax

* filter out USDe

* fix

* syntax

* add day level prices

* syntax

* syntax

* remove ref

* add date

* build out daily prices table

* syntax

* limit data to 30 days

* grouping

* cast sequence as timestamp

* syntax

* do 100 days

* all the daysgit push!

* leave day for follow up PR

---------

Co-authored-by: Huang Geyang <[email protected]>
  • Loading branch information
0xRobin and Hosuke authored Jan 23, 2025
1 parent e8ca05d commit 35d1d10
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 0 deletions.
50 changes: 50 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/_schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: 2

models:
- name: prices_v2_dex_minute_raw
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices sourced from dex.trades, not filtered"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_coinpaprika_minute
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices from coinpaprika (only trusted tokens)"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_minute_sparse
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices from all sources"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_day_sparse
meta:
sector: prices
contributors: 0xRob
description: "sparse day-level prices from all sources"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{ config(
schema = 'prices_v2'
, alias = 'coinpaprika_minute'
, materialized = 'incremental'
, file_format = 'delta'
, partition_by = ['date']
, incremental_strategy = 'merge'
, unique_key = ['blockchain', 'contract_address', 'timestamp']
, incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}

select
ptt.blockchain
, ptt.contract_address
, p.minute as timestamp
, p.price
, cast(null as double) as volume
, 'coinpaprika' as source
, date_trunc('day', p.minute) as date --partition
from
{{ source('prices','usd_0003') }} as p -- todo: fix this source
inner join
{{ ref('prices_trusted_tokens') }} as ptt
on p.token_id = ptt.token_id
where
1=1
{% if is_incremental() %}
and {{ incremental_predicate('p.minute') }}
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{{ config(
schema='prices_v2'
, alias = 'dex_filter'
, materialized = 'view'
)
}}

WITH dex_volume_over_10k as (
select
blockchain
,contract_address
from(
SELECT
d.blockchain,
d.token_bought_address as contract_address,
sum(d.amount_usd) as volume -- in USD
FROM {{ source('dex','trades') }} d
group by 1,2
UNION ALL
SELECT
d.blockchain,
d.token_sold_address as contract_address,
sum(d.amount_usd) as volume -- in USD
FROM {{ source('dex','trades') }} d
group by 1,2
)
group by 1,2
having sum(volume) >= 10000
)
, manual_filter as (
SELECT
blockchain,
contract_address
FROM (
VALUES
('ethereum', 0x4c9EDD5852cd905f086C759E8383e09bff1E68B3) -- USDe has bad events (ex https://etherscan.io/tx/0x0c9464ff4fea893667a43e96e830073031f5587d8f3b33fb27a8464979f12897#eventlog#151)
) as t(blockchain, contract_address)
)

select *
from dex_volume_over_10k
where (blockchain, contract_address)
not in (select blockchain, contract_address from manual_filter)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{{ config(
schema='prices_v2'
, alias = 'dex_minute'
, materialized = 'view'
)
}}

SELECT
*
FROM {{ ref('prices_v2_dex_minute_raw') }}
INNER JOIN {{ ref('prices_v2_dex_filter') }} using (blockchain, contract_address)
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{{ config(
schema='prices_v2'
, alias = 'dex_minute_raw'
, materialized = 'incremental'
, file_format = 'delta'
, partition_by = ['date']
, incremental_strategy = 'merge'
, unique_key = ['blockchain', 'contract_address', 'timestamp']
, incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}

WITH dex_trades_filter_and_unnest as (
SELECT
d.blockchain,
d.token_bought_address as contract_address,
d.block_time as timestamp,
d.amount_usd/d.token_bought_amount as price,
d.amount_usd as volume -- in USD
FROM {{ source('dex','trades') }} d
INNER JOIN {{ref('prices_trusted_tokens')}} t
on t.blockchain = d.blockchain
and t.contract_address = d.token_sold_address -- the token traded against is trusted
LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t
on anti_t.blockchain = d.blockchain
and anti_t.contract_address = d.token_bought_address -- the subjected token is already in trusted
WHERE d.amount_usd > 0 and token_bought_amount > 0 and token_bought_address is not null
and anti_t.contract_address is null
{% if is_incremental() %}
AND {{ incremental_predicate('d.block_time') }}
{% endif %}

UNION ALL

SELECT
d.blockchain,
d.token_sold_address as contract_address,
d.block_time as timestamp,
d.amount_usd/d.token_sold_amount as price,
d.amount_usd as volume -- in USD
FROM {{ source('dex','trades') }} d
INNER JOIN {{ref('prices_trusted_tokens')}} t
on t.blockchain = d.blockchain
and t.contract_address = d.token_bought_address -- the token traded against is trusted
LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t
on anti_t.blockchain = d.blockchain
and anti_t.contract_address = d.token_sold_address -- the subjected token is already in trusted
WHERE d.amount_usd > 0 and token_sold_amount > 0 and token_sold_address is not null
and anti_t.contract_address is null
{% if is_incremental() %}
AND {{ incremental_predicate('d.block_time') }}
{% endif %}
)


SELECT
blockchain,
contract_address,
date_trunc('minute',timestamp) as timestamp,
approx_percentile(price,0.5) as price, -- median
sum(volume) as volume,
'dex.trades' as source,
date_trunc('day',timestamp) as date -- partition
FROM dex_trades_filter_and_unnest
group by 1,2,3,7
81 changes: 81 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/prices_v2_day.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{{ config(
schema='prices_v2',
alias = 'day',
tags = ['prod_exclude'],
materialized = 'incremental',
file_format = 'delta',
partition_by = ['date'],
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address', 'timestamp'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}


--WITH sparse_prices as (
-- select
-- *
-- , lead(timestamp) over (partition by blockchain, contract_address order by timestamp asc) as next_update
-- from (
-- select
-- blockchain
-- , contract_address
-- , timestamp
-- , price
-- , volume
-- , source
-- , date -- this is redundant here as date = timestamp, but we keep it in to be consistent across intervals
-- , source_timestamp
-- from {{ ref('prices_v2_day_sparse') }}
-- {% if is_incremental() %}
-- where {{ incremental_predicate('timestamp') }}
-- {% endif %}
-- -- If we're running incremental, we also need to add the last known prices from before the incremental window, to forward fill them
-- {% if is_incremental() %}
-- UNION ALL
-- SELECT * FROM (
-- select
-- blockchain
-- , contract_address
-- , max(timestamp) -- we get the last updated price
-- , max_by(price,timestamp) as price
-- , max_by(volume,timestamp) as volume
-- , max_by(source,timestamp) as source
-- , max(date) as date
-- , max_by(source_timestamp,timestamp) as source_timestamp
-- from {{ ref('prices_v2_day_sparse') }}
-- where not {{ incremental_predicate('timestamp') }} -- not in the current incremental window (so before that)
-- group by blockchain, contract_address
-- )
-- {% endif %}
-- )
--)
--
---- construct the time spline we want to fill
--, timeseries as (
-- select timestamp
-- from unnest(
-- sequence(cast((select date_trunc('day', min(timestamp)) from sparse_prices) as timestamp)
-- , cast(date_trunc('day', now()) as timestamp)
-- , interval '1' day
-- )
-- ) as foo(timestamp)
-- {% if is_incremental() %}
-- where {{ incremental_predicate('timestamp') }} -- reduce to the incremental window if running incrementally
-- {% endif %}
--)
--
--SELECT
-- p.blockchain
-- , p.contract_address
-- , t.timestamp
-- , p.price
-- , p.volume
-- , p.source
-- , t.timestamp as date
-- , p.source_timestamp
--FROM timeseries t
--INNER JOIN sparse_prices p
-- on p.timestamp <= t.timestamp
-- and (p.next_update is null or p.next_update > t.timestamp)

23 changes: 23 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/prices_v2_latest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{ config(
schema='prices_v2',
alias = 'latest',
materialized = 'incremental',
file_format = 'delta',
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address']
)
}}


SELECT
blockchain
, contract_address
, max(timestamp) as timestamp
, max_by(price,timestamp) as price
, max_by(volume,timestamp) as volume
, max_by(source,timestamp) as source
FROM {{ ref('prices_v2_minute_sparse') }}
{% if is_incremental() %}
WHERE {{ incremental_predicate('timestamp') }}
{% endif %}
GROUP BY 1,2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{{ config(
schema='prices_v2',
alias = 'day_sparse',
materialized = 'incremental',
file_format = 'delta',
partition_by = ['date'],
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address', 'timestamp'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}


SELECT
blockchain
, contract_address
, date as timestamp
, max_by(price,timestamp) as price
, sum(volume) as volume
, max_by(source,timestamp) as source
, date
, max(timestamp) as source_timestamp
FROM {{ ref('prices_v2_minute_sparse') }}
{% if is_incremental() %}
WHERE {{ incremental_predicate('date') }} -- using date here makes sure we always process full days
{% endif %}
GROUP BY 1,2,3,7
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{{ config(
schema='prices_v2',
alias = 'hour_raw',
materialized = 'incremental',
file_format = 'delta',
partition_by = ['date'],
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address', 'timestamp'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}


SELECT
blockchain
, contract_address
, date_trunc('hour',timestamp) as timestamp
, max_by(price,timestamp) as price
, sum(volume) as volume
, max_by(source,timestamp) as source
, date
, max(timestamp) as source_timestamp
FROM {{ ref('prices_v2_minute_sparse') }}
{% if is_incremental() %}
WHERE {{ incremental_predicate("date_trunc('hour',timestamp)") }} -- this makes sure we always proces full hours
{% endif %}
GROUP BY 1,2,3,7
Loading

0 comments on commit 35d1d10

Please sign in to comment.