Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DUX-823 Prices v2 rework #7391

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2a979c2
initial v2 setup
0xRobin Dec 27, 2024
322068e
rename
0xRobin Dec 27, 2024
a517883
fix compile
0xRobin Dec 27, 2024
9468c58
syntax
0xRobin Dec 27, 2024
37668fb
fixxxeees
0xRobin Dec 27, 2024
4a07aeb
moar fixeez
0xRobin Dec 27, 2024
b992a88
moar fixeez
0xRobin Dec 27, 2024
7d1288c
fixezzz
0xRobin Dec 27, 2024
181c3cd
fixezzz
0xRobin Dec 27, 2024
2dec8a4
fix col ref in unique
0xRobin Dec 27, 2024
5d6c2a5
left anti join
0xRobin Dec 27, 2024
f3a7475
Merge branch 'main' into prices-v2-rework
0xRobin Dec 27, 2024
cb5eb24
Merge branch 'main' into prices-v2-rework
Hosuke Jan 2, 2025
be35560
add dex volume filter
0xRobin Jan 3, 2025
75a9ea2
add missing select
0xRobin Jan 3, 2025
fdf25cc
materialize end model
0xRobin Jan 3, 2025
1ca0312
add uniqueness tests
0xRobin Jan 3, 2025
c64c3a9
fix test
0xRobin Jan 3, 2025
7cb513c
store test failures
0xRobin Jan 3, 2025
d8fa7c2
fix anti join syntax
0xRobin Jan 3, 2025
c62d7f9
Merge branch 'main' into prices-v2-rework
0xRobin Jan 3, 2025
b227334
trigger CI
0xRobin Jan 3, 2025
568c288
test CI again
0xRobin Jan 3, 2025
d86f7e4
partition by day
0xRobin Jan 3, 2025
0570ca2
remove partition
0xRobin Jan 3, 2025
485dc1c
Merge branch 'main' into prices-v2-rework
0xRobin Jan 6, 2025
e023c9c
partition by date
0xRobin Jan 6, 2025
a3f0174
try out minute mean and compare
0xRobin Jan 7, 2025
5a0ccbf
table
0xRobin Jan 7, 2025
498c681
more comparisons
0xRobin Jan 7, 2025
0dbe248
use dex source
0xRobin Jan 7, 2025
8016156
fix syntax
0xRobin Jan 7, 2025
300c06a
syntax
0xRobin Jan 7, 2025
a4b36f6
remove temp testing models
0xRobin Jan 7, 2025
5b65c74
Merge branch 'main' into prices-v2-rework
0xRobin Jan 7, 2025
06296c9
syntax
0xRobin Jan 7, 2025
a4cdb00
filter out USDe
0xRobin Jan 8, 2025
a886e22
fix
0xRobin Jan 8, 2025
c73ebe1
syntax
0xRobin Jan 8, 2025
9385bfa
add day level prices
0xRobin Jan 8, 2025
c6f6cb5
syntax
0xRobin Jan 8, 2025
29cdb61
syntax
0xRobin Jan 8, 2025
8875b74
remove ref
0xRobin Jan 8, 2025
641ae1e
add date
0xRobin Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/_schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: 2

models:
- name: prices_v2_dex_minute_raw
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices sourced from dex.trades"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_coinpaprika_minute
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices from coinpaprika (only trusted tokens)"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_minute_raw
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices from all sources"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{{ config(
schema = 'prices_v2'
, alias = 'coinpaprika_minute'
, materialized = 'incremental'
, file_format = 'delta'
, partition_by = ['date']
, incremental_strategy = 'merge'
, unique_key = ['blockchain', 'contract_address', 'timestamp']
, incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}

select
ptt.blockchain
, ptt.contract_address
, p.minute as timestamp
, p.price
, cast(null as double) as volume
, 'coinpaprika' as source
, date_trunc('day', p.minute) as date --partition
from
{{ source('prices','usd_0003') }} as p -- todo: fix this source
inner join
{{ ref('prices_trusted_tokens') }} as ptt
on p.token_id = ptt.token_id
{% if is_incremental() %}
where
{{ incremental_predicate('p.minute') }}
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{{ config(
schema='prices_v2'
, alias = 'dex_filter'
, materialized = 'view'
)
}}

WITH dex_volume_over_10k as (
select
blockchain
,contract_address
from(
SELECT
d.blockchain,
d.token_bought_address as contract_address,
sum(d.amount_usd) as volume -- in USD
FROM {{ source('dex','trades') }} d
group by 1,2
UNION ALL
SELECT
d.blockchain,
d.token_sold_address as contract_address,
sum(d.amount_usd) as volume -- in USD
FROM {{ source('dex','trades') }} d
group by 1,2
)
group by 1,2
having sum(volume) >= 10000
)

select *
from dex_volume_over_10k
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{{ config(
schema='prices_v2'
, alias = 'dex_minute'
, materialized = 'view'
)
}}

SELECT
*
FROM {{ ref('prices_v2_dex_minute_raw') }}
INNER JOIN {{ ref('prices_v2_dex_filter') }} using (blockchain, contract_address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we do this filtering below in prices_v2_dex_minute_raw directly instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opted to keep prices_v2_dex_minute_raw a clean union model

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{{ config(
schema='prices_v2'
, alias = 'dex_minute_raw'
, materialized = 'incremental'
, file_format = 'delta'
, partition_by = ['date']
, incremental_strategy = 'merge'
, unique_key = ['blockchain', 'contract_address', 'timestamp']
, incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}

WITH dex_trades_filter_and_unnest as (
SELECT
d.blockchain,
d.token_bought_address as contract_address,
d.block_time as timestamp,
d.amount_usd/d.token_bought_amount as price,
d.amount_usd as volume -- in USD
FROM {{ source('dex','trades') }} d
INNER JOIN {{ref('prices_trusted_tokens')}} t
on t.blockchain = d.blockchain
and t.contract_address = d.token_sold_address -- the token traded against is trusted
LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t
on anti_t.blockchain = d.blockchain
and anti_t.contract_address = d.token_bought_address -- the subjected token is already in trusted
WHERE d.amount_usd > 0 and token_bought_amount > 0 and token_bought_address is not null
and anti_t.contract_address is null
{% if is_incremental() %}
AND {{ incremental_predicate('d.block_time') }}
{% endif %}

UNION ALL

SELECT
d.blockchain,
d.token_sold_address as contract_address,
d.block_time as timestamp,
d.amount_usd/d.token_sold_amount as price,
d.amount_usd as volume -- in USD
FROM {{ source('dex','trades') }} d
INNER JOIN {{ref('prices_trusted_tokens')}} t
on t.blockchain = d.blockchain
and t.contract_address = d.token_bought_address -- the token traded against is trusted
LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t
on anti_t.blockchain = d.blockchain
and anti_t.contract_address = d.token_sold_address -- the subjected token is already in trusted
WHERE d.amount_usd > 0 and token_sold_amount > 0 and token_sold_address is not null
and anti_t.contract_address is null
{% if is_incremental() %}
AND {{ incremental_predicate('d.block_time') }}
{% endif %}
)


SELECT
blockchain,
contract_address,
date_trunc('minute',timestamp) as timestamp,
sum(price*volume)/sum(volume) as price, -- vwap
0xRobin marked this conversation as resolved.
Show resolved Hide resolved
sum(volume) as volume,
'dex.trades' as source,
date_trunc('day',timestamp) as date -- partition
FROM dex_trades_filter_and_unnest
group by 1,2,3,7
23 changes: 23 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/prices_v2_latest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{ config(
schema='prices_v2',
alias = 'latest',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this model needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed for the sqlmesh side, just thought it would be cheaper do do it directly on the sparse dataset then later on the filled one.

materialized = 'incremental',
file_format = 'delta',
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address']
)
}}


SELECT
blockchain
, contract_address
, max(timestamp) as timestamp
, max_by(price,timestamp) as price
, max_by(volume,timestamp) as volume
, max_by(source,timestamp) as source
FROM {{ ref('prices_v2_minute_raw') }}
{% if is_incremental() %}
WHERE {{ incremental_predicate('timestamp') }}
{% endif %}
GROUP BY 1,2
41 changes: 41 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/prices_v2_minute_raw.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{{ config(
schema='prices_v2',
alias = 'minute_raw',
file_format = 'delta',
materialized = 'incremental',
partition_by = ['date'],
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address', 'timestamp'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}

-- this model feeds into sqlmesh which performs a forward fill and aggregates upto higher timeframes

{% set prices_models = [
ref('prices_v2_dex_minute')
,ref('prices_v2_coinpaprika_minute')
] %}


SELECT *
FROM
(
{% for model in prices_models %}
SELECT
blockchain
, contract_address
, timestamp
, price
, volume -- can be null
, source -- dex.trades/coinpaprika/..
, date -- partition
FROM {{ model }}
{% if is_incremental() %}
WHERE {{ incremental_predicate('timestamp') }}
{% endif %}
{% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
)
Loading