From afbc47019ae04414146582aeb962537658e1a801 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Wed, 8 Jan 2025 13:19:14 +0000 Subject: [PATCH] fix: revert "fix: propdefs print debug statements for team 2" (#27358) --- rust/property-defs-rs/src/app_context.rs | 21 ++++++------ rust/property-defs-rs/src/lib.rs | 38 +-------------------- rust/property-defs-rs/src/metrics_consts.rs | 1 + rust/property-defs-rs/src/types.rs | 34 ++++++++++-------- 4 files changed, 31 insertions(+), 63 deletions(-) diff --git a/rust/property-defs-rs/src/app_context.rs b/rust/property-defs-rs/src/app_context.rs index 05e07b9c6c594..f83484f9b3938 100644 --- a/rust/property-defs-rs/src/app_context.rs +++ b/rust/property-defs-rs/src/app_context.rs @@ -2,13 +2,13 @@ use health::{HealthHandle, HealthRegistry}; use quick_cache::sync::Cache; use sqlx::{postgres::PgPoolOptions, PgPool}; use time::Duration; -use tracing::{debug, warn}; +use tracing::warn; use crate::{ config::Config, metrics_consts::{ - CACHE_WARMING_STATE, GROUP_TYPE_READS, GROUP_TYPE_RESOLVE_TIME, UPDATES_ISSUED, - UPDATE_TRANSACTION_TIME, + CACHE_WARMING_STATE, GROUP_TYPE_READS, GROUP_TYPE_RESOLVE_TIME, SINGLE_UPDATE_ISSUE_TIME, + UPDATES_SKIPPED, UPDATE_TRANSACTION_TIME, }, types::{GroupType, Update}, }; @@ -61,8 +61,6 @@ impl AppContext { metrics::gauge!(CACHE_WARMING_STATE, &[("state", "hot")]).set(1.0); } - let update_count = updates.len(); - let group_type_resolve_time = common_metrics::timing_guard(GROUP_TYPE_RESOLVE_TIME, &[]); self.resolve_group_types_indexes(updates).await?; group_type_resolve_time.fin(); @@ -72,29 +70,30 @@ impl AppContext { let mut tx = self.pool.begin().await?; for update in updates { - if update.team_id() == 2 { - debug!("issued update {:?}", update) - } - + let issue_time = common_metrics::timing_guard(SINGLE_UPDATE_ISSUE_TIME, &[]); match update.issue(&mut *tx).await { - Ok(_) => {} + Ok(_) => issue_time.label("outcome", "success"), Err(sqlx::Error::Database(e)) if e.constraint().is_some() => { // If we hit a constraint violation, we just skip the update. We see // this in production for group-type-indexes not being resolved, and it's // not worth aborting the whole batch for. + metrics::counter!(UPDATES_SKIPPED, &[("reason", "constraint_violation")]) + .increment(1); warn!("Failed to issue update: {:?}", e); + issue_time.label("outcome", "skipped") } Err(e) => { tx.rollback().await?; + issue_time.label("outcome", "abort"); return Err(e); } } + .fin(); } tx.commit().await?; } transaction_time.fin(); - metrics::counter!(UPDATES_ISSUED).increment(update_count as u64); Ok(()) } diff --git a/rust/property-defs-rs/src/lib.rs b/rust/property-defs-rs/src/lib.rs index 868f7ba2da1e2..4862335357ac0 100644 --- a/rust/property-defs-rs/src/lib.rs +++ b/rust/property-defs-rs/src/lib.rs @@ -12,7 +12,7 @@ use metrics_consts::{ }; use quick_cache::sync::Cache; use tokio::sync::mpsc::{self, error::TrySendError}; -use tracing::{debug, error, warn}; +use tracing::{error, warn}; use types::{Event, Update}; pub mod app_context; @@ -150,13 +150,6 @@ pub async fn update_producer_loop( } }; - let team_id = event.team_id; - let event_name = event.event.clone(); - - if team_id == 2 { - debug!("Received event: {:?}", event_name); - } - // Panicking on offset store failure, same reasoning as the panic above - if kafka's down, we're down offset.store().expect("Failed to store offset"); @@ -165,37 +158,17 @@ pub async fn update_producer_loop( continue; } - if team_id == 2 { - debug!("Processing event: {:?}", event_name); - } - let updates = event.into_updates(skip_threshold); - if team_id == 2 { - debug!("Event {} has {} updates", event_name, updates.len()); - } - metrics::counter!(EVENTS_RECEIVED).increment(1); metrics::counter!(UPDATES_SEEN).increment(updates.len() as u64); metrics::histogram!(UPDATES_PER_EVENT).record(updates.len() as f64); for update in updates { if batch.contains(&update) { - if team_id == 2 { - debug!( - "Dropping duplicate update: {:?} for event {}", - update, event_name - ); - } metrics::counter!(COMPACTED_UPDATES).increment(1); continue; } - if team_id == 2 { - debug!( - "Adding update: {:?} for event {} to batch", - update, event_name - ); - } batch.insert(update); } @@ -209,19 +182,10 @@ pub async fn update_producer_loop( last_send = tokio::time::Instant::now(); for update in batch.drain() { if shared_cache.get(&update).is_some() { - if team_id == 2 { - debug!("Filtered update: {:?} for event {}", update, event_name); - } metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(1); continue; } shared_cache.insert(update.clone(), ()); - if team_id == 2 { - debug!( - "Sending update: {:?} for event {} to insert worker", - update, event_name - ); - } match channel.try_send(update) { Ok(_) => {} Err(TrySendError::Full(update)) => { diff --git a/rust/property-defs-rs/src/metrics_consts.rs b/rust/property-defs-rs/src/metrics_consts.rs index 4b29b615c4b47..d0dd725df0e7a 100644 --- a/rust/property-defs-rs/src/metrics_consts.rs +++ b/rust/property-defs-rs/src/metrics_consts.rs @@ -22,3 +22,4 @@ pub const SKIPPED_DUE_TO_TEAM_FILTER: &str = "prop_defs_skipped_due_to_team_filt pub const ISSUE_FAILED: &str = "prop_defs_issue_failed"; pub const CHUNK_SIZE: &str = "prop_defs_chunk_size"; pub const DUPLICATES_IN_BATCH: &str = "prop_defs_duplicates_in_batch"; +pub const SINGLE_UPDATE_ISSUE_TIME: &str = "prop_defs_single_update_issue_time_ms"; diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs index 08da098fe7e95..3b741add5f317 100644 --- a/rust/property-defs-rs/src/types.rs +++ b/rust/property-defs-rs/src/types.rs @@ -7,7 +7,7 @@ use sqlx::{Executor, Postgres}; use tracing::warn; use uuid::Uuid; -use crate::metrics_consts::{EVENTS_SKIPPED, UPDATES_SKIPPED}; +use crate::metrics_consts::{EVENTS_SKIPPED, UPDATES_ISSUED, UPDATES_SKIPPED}; // We skip updates for events we generate pub const EVENTS_WITHOUT_PROPERTIES: [&str; 1] = ["$$plugin_metrics"]; @@ -131,14 +131,6 @@ impl Update { Update::EventProperty(ep) => ep.issue(executor).await, } } - - pub fn team_id(&self) -> i32 { - match self { - Update::Event(e) => e.team_id, - Update::Property(p) => p.team_id, - Update::EventProperty(ep) => ep.team_id, - } - } } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -432,7 +424,7 @@ impl EventDefinition { where E: Executor<'c, Database = Postgres>, { - sqlx::query!( + let res = sqlx::query!( r#" INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id, project_id, last_seen_at, created_at) VALUES ($1, $2, NULL, NULL, $3, $4, $5, NOW()) ON CONFLICT @@ -444,7 +436,11 @@ impl EventDefinition { self.team_id, self.project_id, Utc::now() // We floor the update datetime to the nearest day for cache purposes, but can insert the exact time we see the event - ).execute(executor).await.map(|_| ()) + ).execute(executor).await.map(|_| ()); + + metrics::counter!(UPDATES_ISSUED, &[("type", "event_definition")]).increment(1); + + res } } @@ -476,7 +472,7 @@ impl PropertyDefinition { return Ok(()); } - sqlx::query!( + let res = sqlx::query!( r#" INSERT INTO posthog_propertydefinition (id, name, type, group_type_index, is_numerical, volume_30_day, query_usage_30_day, team_id, project_id, property_type) VALUES ($1, $2, $3, $4, $5, NULL, NULL, $6, $7, $8) @@ -491,7 +487,11 @@ impl PropertyDefinition { self.team_id, self.project_id, self.property_type.as_ref().map(|t| t.to_string()) - ).execute(executor).await.map(|_| ()) + ).execute(executor).await.map(|_| ()); + + metrics::counter!(UPDATES_ISSUED, &[("type", "property_definition")]).increment(1); + + res } } @@ -500,7 +500,7 @@ impl EventProperty { where E: Executor<'c, Database = Postgres>, { - sqlx::query!( + let res = sqlx::query!( r#"INSERT INTO posthog_eventproperty (event, property, team_id, project_id) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING"#, self.event, self.property, @@ -509,7 +509,11 @@ impl EventProperty { ) .execute(executor) .await - .map(|_| ()) + .map(|_| ()); + + metrics::counter!(UPDATES_ISSUED, &[("type", "event_property")]).increment(1); + + res } }