diff --git a/Makefile b/Makefile index 81127cd204..4a4133b1b4 100644 --- a/Makefile +++ b/Makefile @@ -37,9 +37,6 @@ reset-tf-provider: delete-bq-tables: cd tf && tofu state list | grep 'module\.setup\.google_bigquery_table\.' | awk '{print "-target='\''" $$1 "'\''"}' | xargs tofu destroy -auto-approve -run-tf: - cd tf && tofu init && tofu apply -auto-approve - init-bq: delete-bq-tables reset-tf-state clean-deps start-deps setup-db rm tf/import.tf || true cd tf && tofu init && tofu apply -auto-approve || true @@ -47,7 +44,7 @@ init-bq: delete-bq-tables reset-tf-state clean-deps start-deps setup-db cd tf && tofu apply -auto-approve git checkout tf/import.tf -reset-deps: reset-tf-state clean-deps start-deps setup-db run-tf +reset-deps: reset-tf-state clean-deps start-deps setup-db run-server: cargo run --bin lana-cli --features sim-time -- --config ./bats/lana-sim-time.yml | tee .e2e-logs @@ -68,10 +65,10 @@ build: build-for-tests: SQLX_OFFLINE=true cargo build --locked --features sim-time -e2e: reset-tf-state clean-deps start-deps build-for-tests run-tf +e2e: reset-tf-state clean-deps start-deps build-for-tests bats -t bats -e2e-in-ci: bump-cala-docker-image clean-deps start-deps build-for-tests run-tf +e2e-in-ci: clean-deps start-deps build-for-tests SA_CREDS_BASE64=$$(cat ./dev/fake-service-account.json | tr -d '\n' | base64 -w 0) bats -t bats @@ -79,15 +76,7 @@ sdl: SQLX_OFFLINE=true cargo run --bin write_sdl > lana/admin-server/src/graphql/schema.graphql cd apps/admin-panel && pnpm install && pnpm codegen -bump-cala-schema: - curl -H "Authorization: token ${GITHUB_TOKEN}" https://raw.githubusercontent.com/GaloyMoney/cala-enterprise/main/schema.graphql > lana/app/src/ledger/cala/graphql/schema.graphql - -bump-cala-docker-image: - docker compose pull cala - -bump-cala: bump-cala-docker-image bump-cala-schema - -test-in-ci: start-deps setup-db run-tf +test-in-ci: start-deps setup-db cargo nextest run --verbose --locked build-x86_64-unknown-linux-musl-release: diff --git a/docker-compose.yml b/docker-compose.yml index a3ded2d678..adc11c360e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,8 +4,6 @@ services: command: sh -c "while :; do sleep 10; done" depends_on: - core-pg - - cala-pg - - cala - otel-agent - kratos - kratos-pg @@ -24,34 +22,6 @@ services: interval: 1s timeout: 1s retries: 20 - cala-pg: - image: postgres:16.4 - command: -c 'max_connections=200' - ports: - - "5432:5432" - environment: - - POSTGRES_USER=user - - POSTGRES_PASSWORD=password - - POSTGRES_DB=pg - healthcheck: - test: ["CMD-SHELL", "pg_isready"] - interval: 1s - timeout: 1s - retries: 20 - cala: - image: us.gcr.io/galoyorg/cala-enterprise:edge - ports: - - "2252:2252" - volumes: - - ./dev/cala.yml:/cala.yml - environment: - - PG_CON=postgresql://user:password@cala-pg:5432/pg - - CALA_CONFIG=/cala.yml - - BFX_LOCAL_TESTING=true - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-agent:4317 - depends_on: - - cala-pg - - otel-agent otel-agent: ports: - "4317:4317" # OpenTelemetry receiver diff --git a/lana/app/src/app/mod.rs b/lana/app/src/app/mod.rs index 9e2a5931f2..07c83d87b1 100644 --- a/lana/app/src/app/mod.rs +++ b/lana/app/src/app/mod.rs @@ -15,7 +15,6 @@ use crate::{ credit_facility::CreditFacilities, customer::Customers, dashboard::Dashboard, - data_export::Export, deposit::Deposits, document::Documents, governance::Governance, @@ -60,17 +59,16 @@ impl LanaApp { sqlx::migrate!().run(&pool).await?; let mut jobs = Jobs::new(&pool, config.job_execution); - let export = Export::new(config.ledger.cala_url.clone(), &jobs); let audit = Audit::new(&pool); let authz = init_authz(&pool, &audit).await?; let outbox = Outbox::init(&pool).await?; let dashboard = Dashboard::init(&pool, &authz, &jobs, &outbox).await?; let governance = Governance::new(&pool, &authz, &outbox); let ledger = Ledger::init(config.ledger, &authz).await?; - let price = Price::init(&jobs, &export).await?; + let price = Price::init(&jobs).await?; let storage = Storage::new(&config.storage); let documents = Documents::new(&pool, &storage, &authz); - let report = Reports::init(&pool, &config.report, &authz, &jobs, &storage, &export).await?; + let report = Reports::init(&pool, &config.report, &authz, &jobs, &storage).await?; let users = Users::init(&pool, &authz, &outbox, config.user.superuser_email).await?; let cala_config = cala_ledger::CalaLedgerConfig::builder() @@ -98,8 +96,8 @@ impl LanaApp { String::from("OMNIBUS_ACCOUNT_ID"), ) .await?; - let customers = Customers::new(&pool, &config.customer, &deposits, &authz, &export); - let applicants = Applicants::new(&pool, &config.sumsub, &customers, &jobs, &export); + let customers = Customers::new(&pool, &config.customer, &deposits, &authz); + let applicants = Applicants::new(&pool, &config.sumsub, &customers, &jobs); let collateral_factory = chart_of_accounts.transaction_account_factory( accounting_init.chart_ids.off_balance_sheet, @@ -130,7 +128,6 @@ impl LanaApp { config.credit_facility, &governance, &jobs, - &export, &authz, &deposits, &price, @@ -145,7 +142,7 @@ impl LanaApp { accounting_init.journal_id, ) .await?; - let terms_templates = TermsTemplates::new(&pool, &authz, &export); + let terms_templates = TermsTemplates::new(&pool, &authz); jobs.start_poll().await?; Ok(Self { diff --git a/lana/app/src/applicant/error.rs b/lana/app/src/applicant/error.rs index a63e64dc0d..b4b678b8eb 100644 --- a/lana/app/src/applicant/error.rs +++ b/lana/app/src/applicant/error.rs @@ -24,6 +24,4 @@ pub enum ApplicantError { UuidError(#[from] uuid::Error), #[error("ApplicantError - JobError: {0}")] JobError(#[from] crate::job::error::JobError), - #[error("ApplicantError - CalaError: {0}")] - ExportError(#[from] crate::data_export::error::ExportError), } diff --git a/lana/app/src/applicant/job.rs b/lana/app/src/applicant/job.rs index d9a666b960..24907f5744 100644 --- a/lana/app/src/applicant/job.rs +++ b/lana/app/src/applicant/job.rs @@ -2,11 +2,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use crate::{ - data_export::{Export, ExportSumsubApplicantData, SumsubContentType}, - job::*, - primitives::CustomerId, -}; +use crate::{job::*, primitives::CustomerId}; use super::{repo::ApplicantRepo, SumsubClient}; @@ -20,15 +16,14 @@ impl JobConfig for SumsubExportConfig { } pub struct SumsubExportInitializer { - pub(super) export: Export, pub(super) sumsub_client: SumsubClient, pub(super) applicants: ApplicantRepo, } impl SumsubExportInitializer { - pub fn new(export: Export, sumsub_client: SumsubClient, pool: &PgPool) -> Self { + pub fn new(sumsub_client: SumsubClient, pool: &PgPool) -> Self { Self { - export, + // export, sumsub_client, applicants: ApplicantRepo::new(pool), } @@ -47,7 +42,6 @@ impl JobInitializer for SumsubExportInitializer { fn init(&self, job: &Job) -> Result, Box> { Ok(Box::new(SumsubExportJobRunner { config: job.config()?, - export: self.export.clone(), sumsub_client: self.sumsub_client.clone(), applicants: self.applicants.clone(), })) @@ -56,7 +50,7 @@ impl JobInitializer for SumsubExportInitializer { pub struct SumsubExportJobRunner { config: SumsubExportConfig, - export: Export, + // export: Export, sumsub_client: SumsubClient, applicants: ApplicantRepo, } @@ -67,36 +61,36 @@ impl JobRunner for SumsubExportJobRunner { async fn run(&self, _: CurrentJob) -> Result> { match &self.config { SumsubExportConfig::Webhook { callback_id } => { - let webhook_data = self + let _webhook_data = self .applicants .find_webhook_data_by_id(*callback_id) .await?; - self.export - .export_sum_sub_applicant_data(ExportSumsubApplicantData { - customer_id: webhook_data.customer_id, - content: serde_json::to_string(&webhook_data)?, - content_type: SumsubContentType::Webhook, - uploaded_at: webhook_data.timestamp, - }) - .await?; + // self.export + // .export_sum_sub_applicant_data(ExportSumsubApplicantData { + // customer_id: webhook_data.customer_id, + // content: serde_json::to_string(&webhook_data)?, + // content_type: SumsubContentType::Webhook, + // uploaded_at: webhook_data.timestamp, + // }) + // .await?; Ok(JobCompletion::Complete) } SumsubExportConfig::SensitiveInfo { customer_id } => { - let content = self + let _content = self .sumsub_client .get_applicant_details(*customer_id) .await?; - self.export - .export_sum_sub_applicant_data(ExportSumsubApplicantData { - customer_id: *customer_id, - content, - content_type: SumsubContentType::SensitiveInfo, - uploaded_at: chrono::Utc::now(), - }) - .await?; + // self.export + // .export_sum_sub_applicant_data(ExportSumsubApplicantData { + // customer_id: *customer_id, + // content, + // content_type: SumsubContentType::SensitiveInfo, + // uploaded_at: chrono::Utc::now(), + // }) + // .await?; Ok(JobCompletion::Complete) } diff --git a/lana/app/src/applicant/mod.rs b/lana/app/src/applicant/mod.rs index 60a0d41ebe..3b96e46a38 100644 --- a/lana/app/src/applicant/mod.rs +++ b/lana/app/src/applicant/mod.rs @@ -10,7 +10,6 @@ use tracing::instrument; use crate::{ customer::Customers, - data_export::Export, job::Jobs, primitives::{CustomerId, JobId}, }; @@ -102,11 +101,11 @@ impl Applicants { config: &SumsubConfig, users: &Customers, jobs: &Jobs, - export: &Export, + // export: &Export, ) -> Self { let sumsub_client = SumsubClient::new(config); jobs.add_initializer(SumsubExportInitializer::new( - export.clone(), + // export.clone(), sumsub_client.clone(), pool, )); diff --git a/lana/app/src/credit_facility/disbursal/repo.rs b/lana/app/src/credit_facility/disbursal/repo.rs index 41de727b4a..84dec5e360 100644 --- a/lana/app/src/credit_facility/disbursal/repo.rs +++ b/lana/app/src/credit_facility/disbursal/repo.rs @@ -2,7 +2,7 @@ use sqlx::PgPool; use es_entity::*; -use crate::{data_export::Export, primitives::*}; +use crate::primitives::*; use super::{entity::*, error::DisbursalError}; @@ -18,15 +18,11 @@ use super::{entity::*, error::DisbursalError}; )] pub(in crate::credit_facility) struct DisbursalRepo { pool: PgPool, - _export: Export, } impl DisbursalRepo { - pub fn new(pool: &PgPool, export: &Export) -> Self { - Self { - pool: pool.clone(), - _export: export.clone(), - } + pub fn new(pool: &PgPool) -> Self { + Self { pool: pool.clone() } } } diff --git a/lana/app/src/credit_facility/interest_accrual/repo.rs b/lana/app/src/credit_facility/interest_accrual/repo.rs index 7a9eb331d4..3339ac5989 100644 --- a/lana/app/src/credit_facility/interest_accrual/repo.rs +++ b/lana/app/src/credit_facility/interest_accrual/repo.rs @@ -9,8 +9,6 @@ use crate::{ use super::{entity::*, InterestAccrualError}; -const BQ_TABLE_NAME: &str = "interest_accrual_events"; - #[derive(EsRepo, Clone)] #[es_repo( entity = "InterestAccrual", @@ -18,8 +16,7 @@ const BQ_TABLE_NAME: &str = "interest_accrual_events"; columns( credit_facility_id(ty = "CreditFacilityId", update(persist = false), list_for, parent), idx(ty = "InterestAccrualIdx", update(persist = false)), - ), - post_persist_hook = "export" + ) )] pub(in crate::credit_facility) struct InterestAccrualRepo { pool: PgPool, @@ -33,16 +30,4 @@ impl InterestAccrualRepo { export: export.clone(), } } - - async fn export( - &self, - db: &mut es_entity::DbOp<'_>, - _: &InterestAccrual, - events: impl Iterator>, - ) -> Result<(), InterestAccrualError> { - self.export - .es_entity_export(db, BQ_TABLE_NAME, events) - .await?; - Ok(()) - } } diff --git a/lana/app/src/credit_facility/mod.rs b/lana/app/src/credit_facility/mod.rs index 0a6dd6bb5d..c07ecd7e68 100644 --- a/lana/app/src/credit_facility/mod.rs +++ b/lana/app/src/credit_facility/mod.rs @@ -24,7 +24,6 @@ use tracing::instrument; use crate::{ audit::AuditInfo, authorization::{Authorization, CreditFacilityAction, Object}, - data_export::Export, deposit::Deposits, governance::Governance, job::*, @@ -78,7 +77,6 @@ impl CreditFacilities { config: CreditFacilityConfig, governance: &Governance, jobs: &Jobs, - export: &Export, authz: &Authorization, deposits: &Deposits, price: &Price, @@ -92,9 +90,9 @@ impl CreditFacilities { cala: &CalaLedger, journal_id: cala_ledger::JournalId, ) -> Result { - let publisher = CreditFacilityPublisher::new(export, outbox); + let publisher = CreditFacilityPublisher::new(outbox); let credit_facility_repo = CreditFacilityRepo::new(pool, &publisher); - let disbursal_repo = DisbursalRepo::new(pool, export); + let disbursal_repo = DisbursalRepo::new(pool); let ledger = CreditLedger::init(cala, journal_id).await?; let approve_disbursal = ApproveDisbursal::new( &disbursal_repo, diff --git a/lana/app/src/credit_facility/publisher.rs b/lana/app/src/credit_facility/publisher.rs index 3e5a82b2c0..5fc3e3af1b 100644 --- a/lana/app/src/credit_facility/publisher.rs +++ b/lana/app/src/credit_facility/publisher.rs @@ -1,21 +1,17 @@ -const BQ_TABLE_NAME: &str = "credit_facility_events"; - use lana_events::{CreditEvent, FacilityCollateralUpdateAction}; -use crate::{data_export::Export, outbox::Outbox}; +use crate::outbox::Outbox; use super::{entity::*, error::*}; #[derive(Clone)] pub struct CreditFacilityPublisher { - pub(super) export: Export, outbox: Outbox, } impl CreditFacilityPublisher { - pub fn new(export: &Export, outbox: &Outbox) -> Self { + pub fn new(outbox: &Outbox) -> Self { Self { - export: export.clone(), outbox: outbox.clone(), } } @@ -26,10 +22,6 @@ impl CreditFacilityPublisher { entity: &CreditFacility, new_events: es_entity::LastPersisted<'_, CreditFacilityEvent>, ) -> Result<(), CreditFacilityError> { - self.export - .es_entity_export(db, BQ_TABLE_NAME, new_events.clone()) - .await?; - use CreditFacilityEvent::*; let publish_events = new_events .filter_map(|event| match &event.event { diff --git a/lana/app/src/credit_facility/repo.rs b/lana/app/src/credit_facility/repo.rs index 145500b841..cc9f051ce1 100644 --- a/lana/app/src/credit_facility/repo.rs +++ b/lana/app/src/credit_facility/repo.rs @@ -4,7 +4,7 @@ use sqlx::PgPool; use es_entity::*; pub use es_entity::{ListDirection, Sort}; -use crate::{data_export::Export, primitives::*, terms::CollateralizationState}; +use crate::{primitives::*, terms::CollateralizationState}; use super::{ entity::*, @@ -45,7 +45,7 @@ pub struct CreditFacilityRepo { impl CreditFacilityRepo { pub(super) fn new(pool: &PgPool, publisher: &CreditFacilityPublisher) -> Self { - let interest_accruals = InterestAccrualRepo::new(pool, &publisher.export); + let interest_accruals = InterestAccrualRepo::new(pool); Self { pool: pool.clone(), publisher: publisher.clone(), @@ -63,8 +63,6 @@ impl CreditFacilityRepo { } } -const INTEREST_ACCRUAL_BQ_TABLE_NAME: &str = "interest_accrual_events"; - #[derive(EsRepo, Clone)] #[es_repo( entity = "InterestAccrual", @@ -72,32 +70,15 @@ const INTEREST_ACCRUAL_BQ_TABLE_NAME: &str = "interest_accrual_events"; columns( credit_facility_id(ty = "CreditFacilityId", update(persist = false), list_for, parent), idx(ty = "InterestAccrualIdx", update(persist = false), list_by), - ), - post_persist_hook = "export" + ) )] pub(super) struct InterestAccrualRepo { pool: PgPool, - export: Export, } impl InterestAccrualRepo { - pub fn new(pool: &PgPool, export: &Export) -> Self { - Self { - pool: pool.clone(), - export: export.clone(), - } - } - - async fn export( - &self, - db: &mut es_entity::DbOp<'_>, - _: &InterestAccrual, - events: impl Iterator>, - ) -> Result<(), InterestAccrualError> { - self.export - .es_entity_export(db, INTEREST_ACCRUAL_BQ_TABLE_NAME, events) - .await?; - Ok(()) + pub fn new(pool: &PgPool) -> Self { + Self { pool: pool.clone() } } } diff --git a/lana/app/src/customer/mod.rs b/lana/app/src/customer/mod.rs index fd9499029a..912b5699ed 100644 --- a/lana/app/src/customer/mod.rs +++ b/lana/app/src/customer/mod.rs @@ -12,7 +12,6 @@ use authz::PermissionCheck; use crate::{ audit::{AuditInfo, AuditSvc}, authorization::{Action, Authorization, CustomerAction, CustomerAllOrOne, Object}, - data_export::Export, deposit::Deposits, primitives::{CustomerId, KycLevel, Subject}, }; @@ -37,9 +36,8 @@ impl Customers { config: &CustomerConfig, deposits: &Deposits, authz: &Authorization, - export: &Export, ) -> Self { - let repo = CustomerRepo::new(pool, export); + let repo = CustomerRepo::new(pool); let kratos = KratosClient::new(&config.kratos); Self { repo, diff --git a/lana/app/src/customer/repo.rs b/lana/app/src/customer/repo.rs index 2fb95ae499..8cff4be4c9 100644 --- a/lana/app/src/customer/repo.rs +++ b/lana/app/src/customer/repo.rs @@ -3,12 +3,10 @@ use sqlx::PgPool; pub use es_entity::Sort; use es_entity::*; -use crate::{data_export::Export, primitives::*}; +use crate::primitives::*; use super::{entity::*, error::*}; -const BQ_TABLE_NAME: &str = "customer_events"; - #[derive(EsRepo, Clone)] #[es_repo( entity = "Customer", @@ -17,32 +15,15 @@ const BQ_TABLE_NAME: &str = "customer_events"; email(ty = "String", list_by), telegram_id(ty = "String", list_by), status(ty = "AccountStatus", list_for) - ), - post_persist_hook = "export" + ) )] pub struct CustomerRepo { pool: PgPool, - export: Export, } impl CustomerRepo { - pub(super) fn new(pool: &PgPool, export: &Export) -> Self { - Self { - pool: pool.clone(), - export: export.clone(), - } - } - - async fn export( - &self, - db: &mut es_entity::DbOp<'_>, - _: &Customer, - events: impl Iterator>, - ) -> Result<(), CustomerError> { - self.export - .es_entity_export(db, BQ_TABLE_NAME, events) - .await?; - Ok(()) + pub(super) fn new(pool: &PgPool) -> Self { + Self { pool: pool.clone() } } } diff --git a/lana/app/src/data_export/cala/error.rs b/lana/app/src/data_export/cala/error.rs deleted file mode 100644 index 7dfe005b8d..0000000000 --- a/lana/app/src/data_export/cala/error.rs +++ /dev/null @@ -1,19 +0,0 @@ -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum CalaError { - #[error("CalaError - Reqwest: {0}")] - Reqwest(#[from] reqwest::Error), - #[error("CalaError - UnknownGqlError: {0}")] - UnknownGqlError(String), -} - -impl From> for CalaError { - fn from(errors: Vec) -> Self { - let mut error_string = String::new(); - for error in errors { - error_string.push_str(&format!("{:?}\n", error)); - } - CalaError::UnknownGqlError(error_string) - } -} diff --git a/lana/app/src/data_export/cala/graphql/mod.rs b/lana/app/src/data_export/cala/graphql/mod.rs deleted file mode 100644 index f23ed4f9f4..0000000000 --- a/lana/app/src/data_export/cala/graphql/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -#![allow(clippy::enum_variant_names)] -#![allow(clippy::derive_partial_eq_without_eq)] -#![allow(clippy::upper_case_acronyms)] -use graphql_client::GraphQLQuery; - -#[derive(GraphQLQuery)] -#[graphql( - schema_path = "src/ledger/cala/graphql/schema.graphql", - query_path = "src/data_export/cala/graphql/row-insert.gql", - response_derives = "Debug, PartialEq, Eq, Clone" -)] -pub struct RowInsert; - -type UUID = uuid::Uuid; -type JSON = serde_json::Value; diff --git a/lana/app/src/data_export/cala/graphql/row-insert.gql b/lana/app/src/data_export/cala/graphql/row-insert.gql deleted file mode 100644 index f1e0a6f13c..0000000000 --- a/lana/app/src/data_export/cala/graphql/row-insert.gql +++ /dev/null @@ -1,14 +0,0 @@ -mutation RowInsert($tableName: String!, $insertId: String!, $rowData: JSON!) { - bigQuery { - rowInsert(input: { - integrationId: "00000000-0000-0000-0000-000000000001" - tableName: $tableName - insertId: $insertId - rowData: $rowData - }) { - integration { - integrationId - } - } - } -} diff --git a/lana/app/src/data_export/cala/mod.rs b/lana/app/src/data_export/cala/mod.rs deleted file mode 100644 index 3d10bc70b5..0000000000 --- a/lana/app/src/data_export/cala/mod.rs +++ /dev/null @@ -1,96 +0,0 @@ -pub mod error; -mod graphql; - -use graphql_client::{GraphQLQuery, Response}; -use reqwest::Client as ReqwestClient; - -use super::{ExportEntityEventData, ExportPriceData, ExportSumsubApplicantData}; - -use error::*; -use graphql::*; - -#[derive(Clone)] -pub struct CalaClient { - url: String, - client: ReqwestClient, -} - -impl CalaClient { - pub fn new(url: String) -> Self { - let client = ReqwestClient::new(); - CalaClient { client, url } - } - - pub async fn export_price_data( - &self, - table_name: &str, - data: ExportPriceData, - ) -> Result<(), CalaError> { - let insert_id = uuid::Uuid::new_v4().to_string(); - tracing::Span::current().record("insert_id", &insert_id); - let variables = row_insert::Variables { - insert_id, - table_name: table_name.to_string(), - row_data: serde_json::to_value(data).expect("Could not serialize price data"), - }; - Self::traced_gql_request::(&self.client, &self.url, variables).await?; - Ok(()) - } - - pub async fn export_applicant_data( - &self, - table_name: &str, - data: ExportSumsubApplicantData, - ) -> Result<(), CalaError> { - let insert_id = uuid::Uuid::new_v4().to_string(); - tracing::Span::current().record("insert_id", &insert_id); - let variables = row_insert::Variables { - insert_id, - table_name: table_name.to_string(), - row_data: serde_json::to_value(data).expect("Could not serialize event"), - }; - Self::traced_gql_request::(&self.client, &self.url, variables).await?; - Ok(()) - } - - pub async fn export_entity_event_to_bq( - &self, - table_name: &str, - data: &ExportEntityEventData, - ) -> Result<(), CalaError> { - let insert_id = format!("{}:{}", data.id, data.sequence); - tracing::Span::current().record("insert_id", &insert_id); - let variables = row_insert::Variables { - insert_id: format!("{}:{}", data.id, data.sequence), - table_name: table_name.to_string(), - row_data: serde_json::to_value(data).expect("Could not serialize event"), - }; - Self::traced_gql_request::(&self.client, &self.url, variables).await?; - Ok(()) - } - - async fn traced_gql_request( - client: &ReqwestClient, - url: U, - variables: Q::Variables, - ) -> Result, CalaError> - where - ::ResponseData: std::fmt::Debug, - { - let trace_headers = tracing_utils::http::inject_trace(); - let body = Q::build_query(variables); - let response = client - .post(url) - .headers(trace_headers) - .json(&body) - .send() - .await?; - let response = response.json::>().await?; - - if let Some(errors) = response.errors { - return Err(CalaError::from(errors)); - } - - Ok(response) - } -} diff --git a/lana/app/src/data_export/error.rs b/lana/app/src/data_export/error.rs deleted file mode 100644 index 0a558f9e92..0000000000 --- a/lana/app/src/data_export/error.rs +++ /dev/null @@ -1,7 +0,0 @@ -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum ExportError { - #[error("ExportError - CalaError: {0}")] - Cala(#[from] super::cala::error::CalaError), -} diff --git a/lana/app/src/data_export/job.rs b/lana/app/src/data_export/job.rs deleted file mode 100644 index d4644fe5bf..0000000000 --- a/lana/app/src/data_export/job.rs +++ /dev/null @@ -1,53 +0,0 @@ -#![allow(clippy::blocks_in_conditions)] -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use std::borrow::Cow; - -use crate::job::*; - -use super::{cala::CalaClient, ExportEntityEventData}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DataExportConfig { - pub(super) cala_url: String, - pub(super) table_name: Cow<'static, str>, - pub(super) data: ExportEntityEventData, -} - -impl JobConfig for DataExportConfig { - type Initializer = DataExportInitializer; -} - -pub struct DataExportInitializer; - -const DATA_EXPORT_JOB: JobType = JobType::new("data-export"); -impl JobInitializer for DataExportInitializer { - fn job_type() -> JobType - where - Self: Sized, - { - DATA_EXPORT_JOB - } - - fn init(&self, job: &Job) -> Result, Box> { - Ok(Box::new(DataExportJobRunner { - config: job.config()?, - })) - } -} - -pub struct DataExportJobRunner { - config: DataExportConfig, -} - -#[async_trait] -impl JobRunner for DataExportJobRunner { - #[tracing::instrument(name = "lana.data_export.job.run", skip_all, fields(insert_id), err)] - async fn run(&self, _: CurrentJob) -> Result> { - let cala = CalaClient::new(self.config.cala_url.clone()); - cala.export_entity_event_to_bq(&self.config.table_name, &self.config.data) - .await?; - Ok(JobCompletion::Complete) - } -} diff --git a/lana/app/src/data_export/mod.rs b/lana/app/src/data_export/mod.rs deleted file mode 100644 index fd64cf0338..0000000000 --- a/lana/app/src/data_export/mod.rs +++ /dev/null @@ -1,127 +0,0 @@ -mod cala; -pub mod error; -mod job; - -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use tracing::instrument; - -use crate::{ - job::{error::JobError, Jobs}, - primitives::{CustomerId, JobId, PriceOfOneBTC}, -}; - -use cala::*; -use error::ExportError; -use job::{DataExportConfig, DataExportInitializer}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ExportEntityEventData { - id: uuid::Uuid, - event_type: String, - event: String, - sequence: usize, - recorded_at: DateTime, -} - -#[derive(Clone, Serialize, Deserialize)] -pub enum SumsubContentType { - Webhook, - SensitiveInfo, -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct ExportSumsubApplicantData { - pub customer_id: CustomerId, - pub content_type: SumsubContentType, - pub content: String, - pub uploaded_at: DateTime, -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct ExportPriceData { - pub usd_cents_per_btc: PriceOfOneBTC, - pub uploaded_at: DateTime, -} - -const SUMSUB_EXPORT_TABLE_NAME: &str = "sumsub_applicants"; -const PRICE_EXPORT_TABLE_NAME: &str = "price_cents_btc"; - -#[derive(Clone)] -pub struct Export { - cala_url: String, - jobs: Jobs, -} - -impl Export { - pub fn new(cala_url: String, jobs: &Jobs) -> Self { - jobs.add_initializer(DataExportInitializer); - Self { - cala_url, - jobs: jobs.clone(), - } - } - - pub async fn export_price_data(&self, data: ExportPriceData) -> Result<(), ExportError> { - let cala = CalaClient::new(self.cala_url.clone()); - cala.export_price_data(PRICE_EXPORT_TABLE_NAME, data) - .await?; - Ok(()) - } - - pub async fn export_sum_sub_applicant_data( - &self, - data: ExportSumsubApplicantData, - ) -> Result<(), ExportError> { - let cala = CalaClient::new(self.cala_url.clone()); - cala.export_applicant_data(SUMSUB_EXPORT_TABLE_NAME, data) - .await?; - Ok(()) - } - - #[instrument(name = "lana.export.export_last", skip(self, db, events), err)] - pub async fn es_entity_export( - &self, - db: &mut es_entity::DbOp<'_>, - table_name: &'static str, - events: impl Iterator>, - ) -> Result<(), JobError> - where - T: es_entity::EsEvent + 'static, - ::EntityId: Into + std::fmt::Display + Copy, - { - for persisted_event in events { - let id = persisted_event.entity_id.into(); - let event = - serde_json::to_value(&persisted_event.event).expect("Couldn't serialize event"); - let event_type = event - .get("type") - .expect("Event must have a type") - .as_str() - .expect("Type must be a string") - .to_string(); - let event = serde_json::to_string(&event).expect("Couldn't serialize event"); - let sequence = persisted_event.sequence; - let recorded_at = persisted_event.recorded_at; - let data = ExportEntityEventData { - id, - event, - event_type, - sequence, - recorded_at, - }; - self.jobs - .create_and_spawn_in_op( - db, - JobId::new(), - DataExportConfig { - table_name: std::borrow::Cow::Borrowed(table_name), - cala_url: self.cala_url.clone(), - data, - }, - ) - .await?; - } - Ok(()) - } -} diff --git a/lana/app/src/lib.rs b/lana/app/src/lib.rs index 2a30fa7d87..8ec61fd3fc 100644 --- a/lana/app/src/lib.rs +++ b/lana/app/src/lib.rs @@ -7,7 +7,6 @@ pub mod applicant; pub mod authorization; pub mod credit_facility; pub mod customer; -pub mod data_export; pub mod document; pub mod ledger; pub mod price; diff --git a/lana/app/src/price/job.rs b/lana/app/src/price/job.rs index 00f5aa1545..4e717caadd 100644 --- a/lana/app/src/price/job.rs +++ b/lana/app/src/price/job.rs @@ -1,14 +1,9 @@ use async_trait::async_trait; -use chrono::Utc; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::time::Duration; -use crate::{ - data_export::{Export, ExportPriceData}, - job::*, - price::Price, -}; +use crate::{job::*, price::Price}; #[serde_as] #[derive(Clone, Serialize, Deserialize)] @@ -35,14 +30,12 @@ impl Default for ExportPriceJobConfig { pub struct ExportPriceInitializer { price: Price, - export: Export, } impl ExportPriceInitializer { - pub fn new(price: &Price, export: &Export) -> Self { + pub fn new(price: &Price) -> Self { Self { price: price.clone(), - export: export.clone(), } } } @@ -60,7 +53,6 @@ impl JobInitializer for ExportPriceInitializer { Ok(Box::new(ExportPriceJobRunner { config: job.config()?, price: self.price.clone(), - export: self.export.clone(), })) } } @@ -68,19 +60,18 @@ impl JobInitializer for ExportPriceInitializer { pub struct ExportPriceJobRunner { config: ExportPriceJobConfig, price: Price, - export: Export, } #[async_trait] impl JobRunner for ExportPriceJobRunner { async fn run(&self, _: CurrentJob) -> Result> { - let price = self.price.usd_cents_per_btc().await?; - self.export - .export_price_data(ExportPriceData { - usd_cents_per_btc: price, - uploaded_at: Utc::now(), - }) - .await?; + let _price = self.price.usd_cents_per_btc().await?; + // self.export + // .export_price_data(ExportPriceData { + // usd_cents_per_btc: price, + // uploaded_at: Utc::now(), + // }) + // .await?; Ok(JobCompletion::RescheduleIn(self.config.job_interval_secs)) } diff --git a/lana/app/src/price/mod.rs b/lana/app/src/price/mod.rs index 9b86e1a300..78e95740bf 100644 --- a/lana/app/src/price/mod.rs +++ b/lana/app/src/price/mod.rs @@ -4,7 +4,6 @@ use cached::proc_macro::cached; mod job; use crate::{ - data_export::Export, job::Jobs, primitives::{PriceOfOneBTC, UsdCents}, }; @@ -19,14 +18,14 @@ pub struct Price { } impl Price { - pub async fn init(jobs: &Jobs, export: &Export) -> Result { + pub async fn init(jobs: &Jobs) -> Result { let price = Self { bfx: BfxClient::new(), _jobs: jobs.clone(), }; jobs.add_initializer_and_spawn_unique( - job::ExportPriceInitializer::new(&price, export), + job::ExportPriceInitializer::new(&price), job::ExportPriceJobConfig::default(), ) .await?; diff --git a/lana/app/src/report/mod.rs b/lana/app/src/report/mod.rs index 0ead9a47a1..e1a4da08c3 100644 --- a/lana/app/src/report/mod.rs +++ b/lana/app/src/report/mod.rs @@ -11,7 +11,6 @@ use tracing::instrument; use crate::{ authorization::{Authorization, Object, ReportAction}, - data_export::Export, job::Jobs, primitives::{ReportId, Subject}, storage::Storage, @@ -38,9 +37,8 @@ impl Reports { authz: &Authorization, jobs: &Jobs, storage: &Storage, - export: &Export, ) -> Result { - let repo = ReportRepo::new(pool, export); + let repo = ReportRepo::new(pool); jobs.add_initializer(report_jobs::generate::GenerateReportInitializer::new( &repo, config, diff --git a/lana/app/src/report/repo.rs b/lana/app/src/report/repo.rs index c53abfde68..a6dd7e7a02 100644 --- a/lana/app/src/report/repo.rs +++ b/lana/app/src/report/repo.rs @@ -2,36 +2,18 @@ use sqlx::PgPool; use es_entity::*; -use crate::{data_export::Export, primitives::ReportId}; +use crate::primitives::ReportId; use super::{entity::*, error::*}; -const BQ_TABLE_NAME: &str = "report_events"; - #[derive(EsRepo, Clone)] -#[es_repo(entity = "Report", err = "ReportError", post_persist_hook = "export")] +#[es_repo(entity = "Report", err = "ReportError")] pub struct ReportRepo { pool: PgPool, - export: Export, } impl ReportRepo { - pub(super) fn new(pool: &PgPool, export: &Export) -> Self { - Self { - pool: pool.clone(), - export: export.clone(), - } - } - - async fn export( - &self, - db: &mut es_entity::DbOp<'_>, - _: &Report, - events: impl Iterator>, - ) -> Result<(), ReportError> { - self.export - .es_entity_export(db, BQ_TABLE_NAME, events) - .await?; - Ok(()) + pub(super) fn new(pool: &PgPool) -> Self { + Self { pool: pool.clone() } } } diff --git a/lana/app/src/terms_template/mod.rs b/lana/app/src/terms_template/mod.rs index 0afbe04c39..f5e29d9c35 100644 --- a/lana/app/src/terms_template/mod.rs +++ b/lana/app/src/terms_template/mod.rs @@ -10,7 +10,6 @@ use tracing::instrument; use crate::{ audit::AuditInfo, authorization::{Authorization, Object, TermsTemplateAction}, - data_export::Export, primitives::{Subject, TermsTemplateId}, terms::TermValues, }; @@ -26,8 +25,8 @@ pub struct TermsTemplates { } impl TermsTemplates { - pub fn new(pool: &sqlx::PgPool, authz: &Authorization, export: &Export) -> Self { - let repo = TermsTemplateRepo::new(pool, export); + pub fn new(pool: &sqlx::PgPool, authz: &Authorization) -> Self { + let repo = TermsTemplateRepo::new(pool); Self { authz: authz.clone(), repo, diff --git a/lana/app/src/terms_template/repo.rs b/lana/app/src/terms_template/repo.rs index 874156bfe2..f1cdcd9775 100644 --- a/lana/app/src/terms_template/repo.rs +++ b/lana/app/src/terms_template/repo.rs @@ -2,41 +2,22 @@ use sqlx::PgPool; use es_entity::*; -use crate::{data_export::Export, primitives::*}; +use crate::primitives::*; use super::{entity::*, error::*}; -const BQ_TABLE_NAME: &str = "terms_template_events"; - #[derive(EsRepo, Clone)] #[es_repo( entity = "TermsTemplate", err = "TermsTemplateError", - columns(name(ty = "String", list_by)), - post_persist_hook = "export" + columns(name(ty = "String", list_by)) )] pub struct TermsTemplateRepo { pool: PgPool, - export: Export, } impl TermsTemplateRepo { - pub fn new(pool: &PgPool, export: &Export) -> Self { - Self { - pool: pool.clone(), - export: export.clone(), - } - } - - async fn export( - &self, - db: &mut es_entity::DbOp<'_>, - _: &TermsTemplate, - events: impl Iterator>, - ) -> Result<(), TermsTemplateError> { - self.export - .es_entity_export(db, BQ_TABLE_NAME, events) - .await?; - Ok(()) + pub fn new(pool: &PgPool) -> Self { + Self { pool: pool.clone() } } } diff --git a/lana/app/tests/price.rs b/lana/app/tests/price.rs index c5057215b4..93d6f38a12 100644 --- a/lana/app/tests/price.rs +++ b/lana/app/tests/price.rs @@ -1,6 +1,5 @@ mod helpers; use lana_app::{ - data_export::Export, job::{JobExecutorConfig, Jobs}, price::Price, }; @@ -9,8 +8,7 @@ use lana_app::{ async fn get_price() -> anyhow::Result<()> { let pool = helpers::init_pool().await?; let jobs = Jobs::new(&pool, JobExecutorConfig::default()); - let export = Export::new("".to_string(), &jobs); - let price_service = Price::init(&jobs, &export).await?; + let price_service = Price::init(&jobs).await?; let res = price_service.usd_cents_per_btc().await; assert!(res.is_ok());