Skip to content

Commit

Permalink
Merge pull request #75 from cowprotocol/use_new_tables
Browse files Browse the repository at this point in the history
Use new database layout
  • Loading branch information
harisang authored Oct 14, 2024
2 parents 21727e4 + 22b4323 commit eab8179
Show file tree
Hide file tree
Showing 25 changed files with 626 additions and 111 deletions.
24 changes: 23 additions & 1 deletion .github/workflows/pull_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,29 @@ on:
jobs:
python:
runs-on: ubuntu-latest
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: mainnet
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Initialize database
run: |
for file in ${{ github.workspace }}/database/*.sql; do
psql -h localhost -U postgres -d mainnet -f "$file"
done
env:
PGPASSWORD: postgres
- name: Setup Python 3.12
uses: actions/setup-python@v3
with:
Expand All @@ -26,7 +47,8 @@ jobs:
- name: Type Check (mypy)
run: mypy src
- name: Tests
run: pytest tests/e2e/test_blockchain_data.py tests/e2e/test_imbalances_script.py
run: pytest tests/unit/ tests/e2e/
env:
NODE_URL: ${{ secrets.NODE_URL }}
SOLVER_SLIPPAGE_DB_URL: postgres:postgres@localhost:5432/mainnet
CHAIN_SLEEP_TIME: 1
1 change: 0 additions & 1 deletion Dockerfile.test_db
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
FROM postgres
ENV POSTGRES_PASSWORD=postgres
ENV POSTGRES_DB=mainnet
COPY ./database/01_table_creation.sql /docker-entrypoint-initdb.d/
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ daemon:

test_db:
docker build -t $(DOCKER_IMAGE_NAME) -f Dockerfile.test_db .
docker run -d --name $(DOCKER_CONTAINER_NAME) -p $(DB_PORT):$(DB_PORT) $(DOCKER_IMAGE_NAME)
docker run -d --name $(DOCKER_CONTAINER_NAME) -p $(DB_PORT):$(DB_PORT) -v ${PWD}/database/00_legacy_tables.sql:/docker-entrypoint-initdb.d/00_legacy_tables.sql -v ${PWD}/database/01_table_creation.sql:/docker-entrypoint-initdb.d/01_table_creation.sql $(DOCKER_IMAGE_NAME)

stop_test_db:
docker stop $(DOCKER_CONTAINER_NAME) || true
Expand Down
36 changes: 36 additions & 0 deletions database/00_legacy_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- Database Schema for Token Imbalances and Slippage

-- Table: raw_token_imbalances (for storing raw token imbalances)
CREATE TABLE raw_token_imbalances (
auction_id BIGINT NOT NULL,
chain_name VARCHAR(50) NOT NULL,
block_number BIGINT NOT NULL,
tx_hash BYTEA NOT NULL,
token_address BYTEA NOT NULL,
imbalance NUMERIC(78,0),
PRIMARY KEY (tx_hash, token_address)
);

-- Table: slippage_prices (for storing per unit token prices in ETH)
CREATE TABLE slippage_prices (
chain_name VARCHAR(50) NOT NULL,
source VARCHAR(50) NOT NULL,
block_number BIGINT NOT NULL,
tx_hash BYTEA NOT NULL,
token_address BYTEA NOT NULL,
price NUMERIC(42,18),
PRIMARY KEY (tx_hash, token_address)
);

-- Table: Stores fees (i.e. protocol fee, network fee on per token basis)
CREATE TABLE fees_new (
chain_name VARCHAR(50) NOT NULL,
auction_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
tx_hash BYTEA NOT NULL,
order_uid BYTEA NOT NULL,
token_address BYTEA NOT NULL,
fee_amount NUMERIC(78,0) NOT NULL,
fee_type VARCHAR(50) NOT NULL, -- e.g. "protocol" or "network"
PRIMARY KEY (tx_hash, order_uid, token_address, fee_type)
);
4 changes: 2 additions & 2 deletions database/01_table_creation.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE transaction_timestamps (
CREATE TABLE transaction_timestamp (
tx_hash bytea PRIMARY KEY,
time timestamp NOT NULL
);
Expand All @@ -15,7 +15,7 @@ CREATE TYPE PriceSource AS ENUM ('coingecko', 'moralis', 'dune', 'native');
CREATE TABLE prices (
token_address bytea NOT NULL,
time timestamp NOT NULL,
price numeric(60, 18) NOT NULL,
price numeric(78, 18) NOT NULL,
source PriceSource NOT NULL,

PRIMARY KEY (token_address, time, source)
Expand Down
5 changes: 1 addition & 4 deletions requirements.in
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
dune-client
moralis
pandas
pandas-stubs
psycopg2
psycopg
python-dotenv
requests
types-psycopg2
types-requests
SQLAlchemy
web3
Expand Down
20 changes: 2 additions & 18 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,11 @@ mypy-extensions==1.0.0
# typing-inspect
ndjson==0.3.1
# via dune-client
numpy==1.26.4
# via
# pandas
# pandas-stubs
packaging==24.1
# via
# black
# marshmallow
# pytest
pandas==2.2.1
# via -r requirements.in
pandas-stubs==2.2.2.240909
# via -r requirements.in
parsimonious==0.10.0
# via eth-abi
pathspec==0.12.1
Expand All @@ -139,7 +131,7 @@ platformdirs==4.3.6
# pylint
pluggy==1.5.0
# via pytest
psycopg2==2.9.9
psycopg==3.2.3
# via -r requirements.in
pycryptodome==3.20.0
# via
Expand All @@ -159,11 +151,8 @@ python-dateutil==2.9.0.post0
# via
# dune-client
# moralis
# pandas
python-dotenv==1.0.0
# via -r requirements.in
pytz==2024.2
# via pandas
pyunormalize==16.0.0
# via web3
regex==2024.9.11
Expand All @@ -187,12 +176,8 @@ toolz==0.12.1
# via cytoolz
types-deprecated==1.2.9.20240311
# via dune-client
types-psycopg2==2.9.21.20240819
# via -r requirements.in
types-python-dateutil==2.9.0.20240906
# via dune-client
types-pytz==2024.2.0.20240913
# via pandas-stubs
types-pyyaml==6.0.12.20240917
# via dune-client
types-requests==2.32.0.20240914
Expand All @@ -207,15 +192,14 @@ typing-extensions==4.12.2
# eth-typing
# moralis
# mypy
# psycopg
# pydantic
# pydantic-core
# sqlalchemy
# typing-inspect
# web3
typing-inspect==0.9.0
# via dataclasses-json
tzdata==2024.1
# via pandas
urllib3==2.2.3
# via
# moralis
Expand Down
31 changes: 31 additions & 0 deletions src/helpers/blockchain_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from hexbytes import HexBytes
from web3 import Web3
from web3.types import HexStr

from contracts.erc20_abi import erc20_abi
from src.helpers.config import logger
from src.constants import SETTLEMENT_CONTRACT_ADDRESS, INVALIDATED_ORDER_TOPIC

Expand Down Expand Up @@ -71,3 +74,31 @@ def get_auction_id(self, tx_hash: str) -> int:
# convert bytes to int
auction_id = int.from_bytes(call_data_bytes[-8:], byteorder="big")
return auction_id

def get_transaction_timestamp(self, tx_hash: str) -> tuple[str, int]:
receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash))
block_number = receipt["blockNumber"]
block = self.web3.eth.get_block(block_number)
timestamp = block["timestamp"]

return tx_hash, timestamp

def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]:
receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash))

transfer_topic = self.web3.keccak(text="Transfer(address,address,uint256)")

token_addresses: set[str] = set()
for log in receipt["logs"]:
if log["topics"] and log["topics"][0] == transfer_topic:
token_address = log["address"]
token_addresses.add(token_address)

return [(tx_hash, token_address) for token_address in token_addresses]

def get_token_decimals(self, token_address: str) -> int:
"""Get number of decimals for a token."""
contract = self.web3.eth.contract(
address=Web3.to_checksum_address(token_address), abi=erc20_abi
)
return contract.functions.decimals().call()
2 changes: 1 addition & 1 deletion src/helpers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def create_db_connection(db_type: str) -> Engine:
if not db_url:
raise ValueError(f"{db_type} database URL not found in environment variables.")

return create_engine(f"postgresql+psycopg2://{db_url}")
return create_engine(f"postgresql+psycopg://{db_url}")


def check_db_connection(engine: Engine, db_type: str) -> Engine:
Expand Down
103 changes: 102 additions & 1 deletion src/helpers/database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from sqlalchemy import text
from datetime import datetime

from hexbytes import HexBytes
import psycopg
from sqlalchemy import text, insert, Table, Column, Integer, LargeBinary, MetaData
from sqlalchemy.engine import Engine

from src.helpers.config import check_db_connection, logger
from src.helpers.helper_functions import read_sql_file
from src.constants import NULL_ADDRESS_STRING
Expand Down Expand Up @@ -124,3 +129,99 @@ def write_fees(
"fee_recipient": final_recipient,
},
)

def write_transaction_timestamp(
self, transaction_timestamp: tuple[str, int]
) -> None:
"""Writes the transaction timestamp to database."""
query = (
"INSERT INTO transaction_timestamp (tx_hash, time) "
"VALUES (:tx_hash, :time);"
)
self.execute_and_commit(
query,
{
"tx_hash": bytes.fromhex(transaction_timestamp[0][2:]),
"time": datetime.fromtimestamp(transaction_timestamp[1]),
},
)

def write_transaction_tokens(
self, transaction_tokens: list[tuple[str, str]]
) -> None:
"""Writes the transaction tokens to the database."""
query = (
"INSERT INTO transaction_tokens (tx_hash, token_address) "
"VALUES (:tx_hash, :token_address);"
)
for tx_hash, token_address in transaction_tokens:
self.execute_and_commit(
query,
{
"tx_hash": bytes.fromhex(tx_hash[2:]),
"token_address": bytes.fromhex(token_address[2:]),
},
)

def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None:
"""Write prices to database."""
query = (
"INSERT INTO prices (token_address, time, price, source) "
"VALUES (:token_address, :time, :price, :source);"
)
for token_address, time, price, source in prices:
try:
self.execute_and_commit(
query,
{
"token_address": bytes.fromhex(token_address[2:]),
"time": datetime.fromtimestamp(time),
"price": price,
"source": source,
},
)
except psycopg.errors.NumericValueOutOfRange:
logger.warning(
f"Error while writing price data. token: {token_address}, "
f"time: {time}, price: {price}, source: {source}"
)

def get_latest_transaction(self) -> str | None:
"""Get latest transaction hash.
If no transaction is found, return None."""
query = "SELECT tx_hash FROM transaction_timestamp ORDER BY time DESC LIMIT 1;"
result = self.execute_query(query, {}).fetchone()

if result is None:
return None

latest_tx_hash = HexBytes(result[0]).to_0x_hex()
return latest_tx_hash

def get_tokens_without_decimals(self) -> list[str]:
"""Get tokens without decimals."""
query = (
"SELECT token_address FROM transaction_tokens "
"WHERE token_address not in (SELECT token_address FROM token_decimals);"
)
result = self.execute_query(query, {}).fetchall()
return list({HexBytes(row[0]).to_0x_hex() for row in result})

def write_token_decimals(self, token_decimals: list[tuple[str, int]]) -> None:
self.engine = check_db_connection(self.engine, "solver_slippage")

# Define the table without creating a model class
token_decimals_table = Table(
"token_decimals", MetaData(), autoload_with=self.engine
)

# Prepare the data
records = [
{"token_address": bytes.fromhex(token_address[2:]), "decimals": decimals}
for token_address, decimals in token_decimals
]

# Execute the bulk insert
with self.engine.connect() as conn:
conn.execute(token_decimals_table.insert(), records)
conn.commit()
2 changes: 1 addition & 1 deletion src/imbalances_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from web3 import Web3
from web3.datastructures import AttributeDict
from web3.types import TxReceipt
from web3.types import HexStr, TxReceipt
from src.helpers.config import CHAIN_RPC_ENDPOINTS, logger
from src.constants import (
SETTLEMENT_CONTRACT_ADDRESS,
Expand Down
Loading

0 comments on commit eab8179

Please sign in to comment.