Skip to content

Commit

Permalink
fix: revert "fix: propdefs print debug statements for team 2" (#27358)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Jan 8, 2025
1 parent 7d475c6 commit afbc470
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 63 deletions.
21 changes: 10 additions & 11 deletions rust/property-defs-rs/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use health::{HealthHandle, HealthRegistry};
use quick_cache::sync::Cache;
use sqlx::{postgres::PgPoolOptions, PgPool};
use time::Duration;
use tracing::{debug, warn};
use tracing::warn;

use crate::{
config::Config,
metrics_consts::{
CACHE_WARMING_STATE, GROUP_TYPE_READS, GROUP_TYPE_RESOLVE_TIME, UPDATES_ISSUED,
UPDATE_TRANSACTION_TIME,
CACHE_WARMING_STATE, GROUP_TYPE_READS, GROUP_TYPE_RESOLVE_TIME, SINGLE_UPDATE_ISSUE_TIME,
UPDATES_SKIPPED, UPDATE_TRANSACTION_TIME,
},
types::{GroupType, Update},
};
Expand Down Expand Up @@ -61,8 +61,6 @@ impl AppContext {
metrics::gauge!(CACHE_WARMING_STATE, &[("state", "hot")]).set(1.0);
}

let update_count = updates.len();

let group_type_resolve_time = common_metrics::timing_guard(GROUP_TYPE_RESOLVE_TIME, &[]);
self.resolve_group_types_indexes(updates).await?;
group_type_resolve_time.fin();
Expand All @@ -72,29 +70,30 @@ impl AppContext {
let mut tx = self.pool.begin().await?;

for update in updates {
if update.team_id() == 2 {
debug!("issued update {:?}", update)
}

let issue_time = common_metrics::timing_guard(SINGLE_UPDATE_ISSUE_TIME, &[]);
match update.issue(&mut *tx).await {
Ok(_) => {}
Ok(_) => issue_time.label("outcome", "success"),
Err(sqlx::Error::Database(e)) if e.constraint().is_some() => {
// If we hit a constraint violation, we just skip the update. We see
// this in production for group-type-indexes not being resolved, and it's
// not worth aborting the whole batch for.
metrics::counter!(UPDATES_SKIPPED, &[("reason", "constraint_violation")])
.increment(1);
warn!("Failed to issue update: {:?}", e);
issue_time.label("outcome", "skipped")
}
Err(e) => {
tx.rollback().await?;
issue_time.label("outcome", "abort");
return Err(e);
}
}
.fin();
}
tx.commit().await?;
}
transaction_time.fin();

metrics::counter!(UPDATES_ISSUED).increment(update_count as u64);
Ok(())
}

Expand Down
38 changes: 1 addition & 37 deletions rust/property-defs-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use metrics_consts::{
};
use quick_cache::sync::Cache;
use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{debug, error, warn};
use tracing::{error, warn};
use types::{Event, Update};

pub mod app_context;
Expand Down Expand Up @@ -150,13 +150,6 @@ pub async fn update_producer_loop(
}
};

let team_id = event.team_id;
let event_name = event.event.clone();

if team_id == 2 {
debug!("Received event: {:?}", event_name);
}

// Panicking on offset store failure, same reasoning as the panic above - if kafka's down, we're down
offset.store().expect("Failed to store offset");

Expand All @@ -165,37 +158,17 @@ pub async fn update_producer_loop(
continue;
}

if team_id == 2 {
debug!("Processing event: {:?}", event_name);
}

let updates = event.into_updates(skip_threshold);

if team_id == 2 {
debug!("Event {} has {} updates", event_name, updates.len());
}

metrics::counter!(EVENTS_RECEIVED).increment(1);
metrics::counter!(UPDATES_SEEN).increment(updates.len() as u64);
metrics::histogram!(UPDATES_PER_EVENT).record(updates.len() as f64);

for update in updates {
if batch.contains(&update) {
if team_id == 2 {
debug!(
"Dropping duplicate update: {:?} for event {}",
update, event_name
);
}
metrics::counter!(COMPACTED_UPDATES).increment(1);
continue;
}
if team_id == 2 {
debug!(
"Adding update: {:?} for event {} to batch",
update, event_name
);
}
batch.insert(update);
}

Expand All @@ -209,19 +182,10 @@ pub async fn update_producer_loop(
last_send = tokio::time::Instant::now();
for update in batch.drain() {
if shared_cache.get(&update).is_some() {
if team_id == 2 {
debug!("Filtered update: {:?} for event {}", update, event_name);
}
metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(1);
continue;
}
shared_cache.insert(update.clone(), ());
if team_id == 2 {
debug!(
"Sending update: {:?} for event {} to insert worker",
update, event_name
);
}
match channel.try_send(update) {
Ok(_) => {}
Err(TrySendError::Full(update)) => {
Expand Down
1 change: 1 addition & 0 deletions rust/property-defs-rs/src/metrics_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub const SKIPPED_DUE_TO_TEAM_FILTER: &str = "prop_defs_skipped_due_to_team_filt
pub const ISSUE_FAILED: &str = "prop_defs_issue_failed";
pub const CHUNK_SIZE: &str = "prop_defs_chunk_size";
pub const DUPLICATES_IN_BATCH: &str = "prop_defs_duplicates_in_batch";
pub const SINGLE_UPDATE_ISSUE_TIME: &str = "prop_defs_single_update_issue_time_ms";
34 changes: 19 additions & 15 deletions rust/property-defs-rs/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sqlx::{Executor, Postgres};
use tracing::warn;
use uuid::Uuid;

use crate::metrics_consts::{EVENTS_SKIPPED, UPDATES_SKIPPED};
use crate::metrics_consts::{EVENTS_SKIPPED, UPDATES_ISSUED, UPDATES_SKIPPED};

// We skip updates for events we generate
pub const EVENTS_WITHOUT_PROPERTIES: [&str; 1] = ["$$plugin_metrics"];
Expand Down Expand Up @@ -131,14 +131,6 @@ impl Update {
Update::EventProperty(ep) => ep.issue(executor).await,
}
}

pub fn team_id(&self) -> i32 {
match self {
Update::Event(e) => e.team_id,
Update::Property(p) => p.team_id,
Update::EventProperty(ep) => ep.team_id,
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -432,7 +424,7 @@ impl EventDefinition {
where
E: Executor<'c, Database = Postgres>,
{
sqlx::query!(
let res = sqlx::query!(
r#"
INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id, project_id, last_seen_at, created_at)
VALUES ($1, $2, NULL, NULL, $3, $4, $5, NOW()) ON CONFLICT
Expand All @@ -444,7 +436,11 @@ impl EventDefinition {
self.team_id,
self.project_id,
Utc::now() // We floor the update datetime to the nearest day for cache purposes, but can insert the exact time we see the event
).execute(executor).await.map(|_| ())
).execute(executor).await.map(|_| ());

metrics::counter!(UPDATES_ISSUED, &[("type", "event_definition")]).increment(1);

res
}
}

Expand Down Expand Up @@ -476,7 +472,7 @@ impl PropertyDefinition {
return Ok(());
}

sqlx::query!(
let res = sqlx::query!(
r#"
INSERT INTO posthog_propertydefinition (id, name, type, group_type_index, is_numerical, volume_30_day, query_usage_30_day, team_id, project_id, property_type)
VALUES ($1, $2, $3, $4, $5, NULL, NULL, $6, $7, $8)
Expand All @@ -491,7 +487,11 @@ impl PropertyDefinition {
self.team_id,
self.project_id,
self.property_type.as_ref().map(|t| t.to_string())
).execute(executor).await.map(|_| ())
).execute(executor).await.map(|_| ());

metrics::counter!(UPDATES_ISSUED, &[("type", "property_definition")]).increment(1);

res
}
}

Expand All @@ -500,7 +500,7 @@ impl EventProperty {
where
E: Executor<'c, Database = Postgres>,
{
sqlx::query!(
let res = sqlx::query!(
r#"INSERT INTO posthog_eventproperty (event, property, team_id, project_id) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING"#,
self.event,
self.property,
Expand All @@ -509,7 +509,11 @@ impl EventProperty {
)
.execute(executor)
.await
.map(|_| ())
.map(|_| ());

metrics::counter!(UPDATES_ISSUED, &[("type", "event_property")]).increment(1);

res
}
}

Expand Down

0 comments on commit afbc470

Please sign in to comment.