diff --git a/Cargo.lock b/Cargo.lock index a7d18c629..b104b5e84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3542,7 +3542,9 @@ dependencies = [ "async-stream", "atty", "cargo-lock", + "chrono", "clap 4.4.8", + "digital_asset_types", "futures", "git-version", "hyper", @@ -3554,7 +3556,10 @@ dependencies = [ "program_transformers", "prometheus", "redis", + "rust-crypto", + "sea-orm", "serde", + "serde_json", "serde_yaml", "solana-sdk", "sqlx", @@ -4428,6 +4433,7 @@ dependencies = [ "mpl-bubblegum", "num-traits", "sea-orm", + "serde", "serde_json", "solana-sdk", "solana-transaction-status", diff --git a/nft_ingester2/Cargo.toml b/nft_ingester2/Cargo.toml index df11b29fb..f8e82b55b 100644 --- a/nft_ingester2/Cargo.toml +++ b/nft_ingester2/Cargo.toml @@ -9,7 +9,9 @@ publish = { workspace = true } anyhow = { workspace = true } async-stream = { workspace = true } atty = { workspace = true } +chrono = { workspace = true } clap = { workspace = true, features = ["cargo", "derive"] } +digital_asset_types = { workspace = true } futures = { workspace = true } hyper = { workspace = true, features = ["server"] } json5 = { workspace = true } @@ -20,7 +22,10 @@ opentelemetry_sdk = { workspace = true, features = ["trace"] } program_transformers = { workspace = true } prometheus = { workspace = true } redis = { workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"] } +rust-crypto = { workspace = true } +sea-orm = { workspace = true, features = ["sqlx-postgres"] } serde = { workspace = true } +serde_json = { workspace = true } serde_yaml = { workspace = true } solana-sdk = { workspace = true } sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } diff --git a/nft_ingester2/config-run.yml b/nft_ingester2/config-run.yml index 19a1b3f84..3af3ad2e7 100644 --- a/nft_ingester2/config-run.yml +++ b/nft_ingester2/config-run.yml @@ -20,7 +20,9 @@ redis: postgres: url: postgres://solana:solana@localhost/solana min_connections: 10 - max_connections: 25 + max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible program_transformer: transactions_cl_audits: false - max_tasks_in_process: 100 + max_tasks_in_process: 40 +download_metadata_handler: + max_attempts: 3 diff --git a/nft_ingester2/src/config.rs b/nft_ingester2/src/config.rs index 8c8d2fa4b..fad729312 100644 --- a/nft_ingester2/src/config.rs +++ b/nft_ingester2/src/config.rs @@ -3,6 +3,7 @@ use { serde::{de, Deserialize}, std::{collections::HashMap, net::SocketAddr, path::Path, time::Duration}, tokio::fs, + tracing::warn, yellowstone_grpc_proto::prelude::SubscribeRequest, yellowstone_grpc_tools::config::{ deserialize_usize_str, ConfigGrpcRequestAccounts, ConfigGrpcRequestCommitment, @@ -181,6 +182,15 @@ pub struct ConfigIngester { pub redis: ConfigIngesterRedis, pub postgres: ConfigIngesterPostgres, pub program_transformer: ConfigIngesterProgramTransformer, + pub download_metadata_handler: ConfigDownloadMetadataHandler, +} + +impl ConfigIngester { + pub fn check(&self) { + if self.postgres.max_connections < self.program_transformer.max_tasks_in_process { + warn!("`postgres.max_connections` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible"); + } + } } #[derive(Debug, Deserialize)] @@ -332,7 +342,7 @@ impl ConfigIngesterPostgres { } pub const fn default_max_connections() -> usize { - 25 + 50 } } @@ -353,6 +363,21 @@ impl ConfigIngesterProgramTransformer { } pub const fn default_max_tasks_in_process() -> usize { - 100 + 40 + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigDownloadMetadataHandler { + #[serde( + default = "ConfigDownloadMetadataHandler::default_max_attempts", + deserialize_with = "deserialize_usize_str" + )] + pub max_attempts: usize, +} + +impl ConfigDownloadMetadataHandler { + pub const fn default_max_attempts() -> usize { + 3 } } diff --git a/nft_ingester2/src/ingester.rs b/nft_ingester2/src/ingester.rs index 479e5bddb..6cc30e4f6 100644 --- a/nft_ingester2/src/ingester.rs +++ b/nft_ingester2/src/ingester.rs @@ -1,14 +1,17 @@ use { crate::{ - config::ConfigIngester, + config::{ConfigDownloadMetadataHandler, ConfigIngester}, postgres::{create_pool as pg_create_pool, metrics_pgpool}, prom::{ - program_transformer_task_status_inc, program_transformer_tasks_total_set, - ProgramTransformerTaskStatusKind, + download_metadata_inserted_total_inc, program_transformer_task_status_inc, + program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind, }, redis::{metrics_xlen, ProgramTransformerInfo, RedisStream}, util::create_shutdown, }, + chrono::Utc, + crypto::{digest::Digest, sha2::Sha256}, + digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks}, futures::{ future::{pending, BoxFuture, FusedFuture, FutureExt}, stream::StreamExt, @@ -17,9 +20,18 @@ use { error::ProgramTransformerError, DownloadMetadataInfo, DownloadMetadataNotifier, ProgramTransformer, }, - std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, + sea_orm::{ + entity::{ActiveModelTrait, ActiveValue}, + error::{DbErr, RuntimeErr}, + SqlxPostgresConnector, + }, + sqlx::{Error as SqlxError, PgPool}, + std::{ + borrow::Cow, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }, tokio::{ task::JoinSet, @@ -29,8 +41,6 @@ use { }; pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { - println!("{:#?}", config); - // connect to Redis let client = redis::Client::open(config.redis.url.clone())?; let connection = client.get_multiplexed_tokio_connection().await?; @@ -55,6 +65,10 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { async move { metrics_pgpool(pgpool).await } }); + // spawn extra task to save tasks about download asset metadata + let download_metadata_handler = + DownloadMetadataHandler::new(pgpool.clone(), config.download_metadata_handler)?; + // create redis stream reader let (mut redis_messages, redis_tasks_fut) = RedisStream::new(config.redis, connection).await?; tokio::pin!(redis_tasks_fut); @@ -62,12 +76,12 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { // program transforms related let pt_accounts = Arc::new(ProgramTransformer::new( pgpool.clone(), - create_notifier(), + download_metadata_handler.create_notifier(), false, )); let pt_transactions = Arc::new(ProgramTransformer::new( pgpool.clone(), - create_notifier(), + download_metadata_handler.create_notifier(), config.program_transformer.transactions_cl_audits, )); let pt_max_tasks_in_process = config.program_transformer.max_tasks_in_process; @@ -214,14 +228,64 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { result } -fn create_notifier() -> DownloadMetadataNotifier { - Box::new( - move |_info: DownloadMetadataInfo| -> BoxFuture< - 'static, - Result<(), Box>, - > { - // TODO - Box::pin(async move { Ok(()) }) - }, - ) +#[derive(Debug)] +struct DownloadMetadataHandler { + pgpool: PgPool, + max_attempts: i16, +} + +impl DownloadMetadataHandler { + pub fn new(pgpool: PgPool, config: ConfigDownloadMetadataHandler) -> anyhow::Result { + Ok(Self { + pgpool, + max_attempts: config.max_attempts.try_into()?, + }) + } + + pub fn create_notifier(&self) -> DownloadMetadataNotifier { + let pgpool = self.pgpool.clone(); + let max_attempts = self.max_attempts; + Box::new( + move |info: DownloadMetadataInfo| -> BoxFuture< + 'static, + Result<(), Box>, + > { + let pgpool = pgpool.clone(); + Box::pin(async move { + const NAME: &str = "DownloadMetadata"; + + let data = serde_json::to_value(info)?; + + let mut hasher = Sha256::new(); + hasher.input(NAME.as_bytes()); + hasher.input(serde_json::to_vec(&data)?.as_slice()); + let hash = hasher.result_str(); + + let model = tasks::ActiveModel { + id: ActiveValue::Set(hash), + task_type: ActiveValue::Set(NAME.to_owned()), + data: ActiveValue::Set(data), + status: ActiveValue::Set(TaskStatus::Pending), + created_at: ActiveValue::Set(Utc::now().naive_utc()), + locked_until: ActiveValue::Set(None), + locked_by: ActiveValue::Set(None), + max_attempts: ActiveValue::Set(max_attempts), + attempts: ActiveValue::Set(0), + duration: ActiveValue::Set(None), + errors: ActiveValue::Set(None), + }; + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pgpool); + + match model.insert(&conn).await.map(|_mode| ()) { + // unique_violation + Err(DbErr::Query(RuntimeErr::SqlxError(SqlxError::Database(dberr)))) if dberr.code() == Some(Cow::Borrowed("23505")) => {}, + value => value?, + }; + download_metadata_inserted_total_inc(); + + Ok(()) + }) + }, + ) + } } diff --git a/nft_ingester2/src/main.rs b/nft_ingester2/src/main.rs index 0bd1f9b8d..7622b8aec 100644 --- a/nft_ingester2/src/main.rs +++ b/nft_ingester2/src/main.rs @@ -70,6 +70,7 @@ async fn main() -> anyhow::Result<()> { let config = config_load::(&args.config) .await .with_context(|| format!("failed to parse config from: {}", args.config))?; + config.check(); ingester::run(config).await } } diff --git a/nft_ingester2/src/prom.rs b/nft_ingester2/src/prom.rs index 884407760..e81de4255 100644 --- a/nft_ingester2/src/prom.rs +++ b/nft_ingester2/src/prom.rs @@ -5,7 +5,7 @@ use { service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, }, - prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}, + prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}, std::{net::SocketAddr, sync::Once}, tracing::{error, info}, }; @@ -46,6 +46,10 @@ lazy_static::lazy_static! { Opts::new("program_transformer_task_status", "Status of processed messages"), &["status"], ).unwrap(); + + static ref DOWNLOAD_METADATA_INSERTED_TOTAL: IntCounter = IntCounter::new( + "download_metadata_inserted_total", "Total number of inserted tasks for download metadata" + ).unwrap(); } pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { @@ -65,6 +69,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(PGPOOL_CONNECTIONS_TOTAL); register!(PROGRAM_TRANSFORMER_TASKS_TOTAL); register!(PROGRAM_TRANSFORMER_TASK_STATUS); + register!(DOWNLOAD_METADATA_INSERTED_TOTAL); VERSION .with_label_values(&[ @@ -171,3 +176,7 @@ pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKin }]) .inc() } + +pub fn download_metadata_inserted_total_inc() { + DOWNLOAD_METADATA_INSERTED_TOTAL.inc() +} diff --git a/program_transformers/Cargo.toml b/program_transformers/Cargo.toml index 31d35871f..f600caf98 100644 --- a/program_transformers/Cargo.toml +++ b/program_transformers/Cargo.toml @@ -13,6 +13,7 @@ futures = { workspace = true } mpl-bubblegum = { workspace = true } num-traits = { workspace = true } sea-orm = { workspace = true, features = [] } +serde = { workspace = true } serde_json = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index 996a0ffac..237603996 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -15,6 +15,7 @@ use { }, futures::future::BoxFuture, sea_orm::{DatabaseConnection, SqlxPostgresConnector}, + serde::Serialize, solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature}, solana_transaction_status::InnerInstructions, sqlx::PgPool, @@ -44,7 +45,7 @@ pub struct TransactionInfo { pub meta_inner_instructions: Vec, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct DownloadMetadataInfo { asset_data_id: Vec, uri: String,