Skip to content

Commit

Permalink
fix(capture): add ability to configure event dropping by token (#26756)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Dec 9, 2024
1 parent cfa779f commit 8955ed1
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 2 deletions.
1 change: 1 addition & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct Config {
pub overflow_burst_limit: NonZeroU32,

pub overflow_forced_keys: Option<String>, // Coma-delimited keys
pub dropped_keys: Option<String>, // "<token>:<distinct_id or *>,<distinct_id or *>;<token>..."

#[envconfig(nested = true)]
pub kafka: KafkaConfig,
Expand Down
1 change: 1 addition & 0 deletions rust/capture/src/limiters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod overflow;
pub mod redis;
pub mod token_dropper;
76 changes: 76 additions & 0 deletions rust/capture/src/limiters/token_dropper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::collections::HashMap;

use tracing::warn;

#[derive(Default)]
pub struct TokenDropper {
to_drop: HashMap<String, Vec<String>>,
}

impl TokenDropper {
// Takes "<token>:<distinct_id or *>,<distinct_id or *>;<token>..."
pub fn new(config: &str) -> Self {
let mut to_drop = HashMap::new();
for pair in config.split(';') {
let mut parts = pair.split(':');
let Some(token) = parts.next() else {
warn!("No distinct id's configured for pair {}", pair);
continue;
};
let Some(ids) = parts.next() else {
warn!("No distinct id's configured for token {}", token);
continue;
};
let ids = ids.split(',').map(|s| s.to_string()).collect();
to_drop.insert(token.to_string(), ids);
}
Self { to_drop }
}

pub fn should_drop(&self, token: &str, distinct_id: &str) -> bool {
self.to_drop
.get(token)
.map(|ids| ids.iter().any(|id| id == distinct_id || id == "*"))
.unwrap_or(false)
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_empty_config() {
let dropper = TokenDropper::new("");
assert!(!dropper.should_drop("token", "id"));
}

#[test]
fn test_single_token_id() {
let dropper = TokenDropper::new("token:id");
assert!(dropper.should_drop("token", "id"));
assert!(!dropper.should_drop("token", "other"));
}

#[test]
fn test_multiple_ids() {
let dropper = TokenDropper::new("token:id1,id2");
assert!(dropper.should_drop("token", "id1"));
assert!(dropper.should_drop("token", "id2"));
assert!(!dropper.should_drop("token", "id3"));
}

#[test]
fn test_wildcard() {
let dropper = TokenDropper::new("token:*");
assert!(dropper.should_drop("token", "anything"));
}

#[test]
fn test_multiple_tokens() {
let dropper = TokenDropper::new("token1:id1;token2:id2");
assert!(dropper.should_drop("token1", "id1"));
assert!(dropper.should_drop("token2", "id2"));
assert!(!dropper.should_drop("token1", "id2"));
}
}
4 changes: 4 additions & 0 deletions rust/capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tower::limit::ConcurrencyLimitLayer;
use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::limiters::token_dropper::TokenDropper;
use crate::test_endpoint;
use crate::{limiters::redis::RedisLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint};

Expand All @@ -28,6 +29,7 @@ pub struct State {
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing_limiter: RedisLimiter,
pub token_dropper: Arc<TokenDropper>,
pub event_size_limit: usize,
}

Expand All @@ -46,6 +48,7 @@ pub fn router<
sink: S,
redis: Arc<R>,
billing_limiter: RedisLimiter,
token_dropper: TokenDropper,
metrics: bool,
capture_mode: CaptureMode,
concurrency_limit: Option<usize>,
Expand All @@ -57,6 +60,7 @@ pub fn router<
redis,
billing_limiter,
event_size_limit,
token_dropper: Arc::new(token_dropper),
};

// Very permissive CORS policy, as old SDK versions
Expand Down
9 changes: 9 additions & 0 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::limiters::overflow::OverflowLimiter;
use crate::limiters::redis::{
QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY,
};

use crate::limiters::token_dropper::TokenDropper;
use crate::redis::RedisClient;
use crate::router;
use crate::router::BATCH_BODY_SIZE;
Expand Down Expand Up @@ -54,6 +56,11 @@ where
)
.expect("failed to create billing limiter");

let token_dropper = config
.dropped_keys
.map(|k| TokenDropper::new(&k))
.unwrap_or_default();

// In Recordings capture mode, we unpack a batch of events, and then pack them back up into
// a big blob and send to kafka all at once - so we should abort unpacking a batch if the data
// size crosses the kafka limit. In the Events mode, we can unpack the batch and send each
Expand All @@ -78,6 +85,7 @@ where
PrintSink {},
redis_client,
billing_limiter,
token_dropper,
config.export_prometheus,
config.capture_mode,
config.concurrency_limit,
Expand Down Expand Up @@ -126,6 +134,7 @@ where
sink,
redis_client,
billing_limiter,
token_dropper,
config.export_prometheus,
config.capture_mode,
config.concurrency_limit,
Expand Down
22 changes: 20 additions & 2 deletions rust/capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use serde_json::json;
use serde_json::Value;
use tracing::instrument;

use crate::limiters::token_dropper::TokenDropper;
use crate::prometheus::report_dropped_events;
use crate::v0_request::{
Compression, DataType, ProcessedEvent, ProcessedEventMetadata, ProcessingContext, RawRequest,
Expand Down Expand Up @@ -172,7 +173,14 @@ pub async fn event(
}
Err(err) => Err(err),
Ok((context, events)) => {
if let Err(err) = process_events(state.sink.clone(), &events, &context).await {
if let Err(err) = process_events(
state.sink.clone(),
state.token_dropper.clone(),
&events,
&context,
)
.await
{
let cause = match err {
CaptureError::EmptyDistinctId => "empty_distinct_id",
CaptureError::MissingDistinctId => "missing_distinct_id",
Expand Down Expand Up @@ -300,14 +308,24 @@ pub fn process_single_event(
#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sinks::Event + Send + Sync>,
dropper: Arc<TokenDropper>,
events: &'a [RawEvent],
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
let events: Vec<ProcessedEvent> = events
let mut events: Vec<ProcessedEvent> = events
.iter()
.map(|e| process_single_event(e, context))
.collect::<Result<Vec<ProcessedEvent>, CaptureError>>()?;

events.retain(|e| {
if dropper.should_drop(&e.event.token, &e.event.distinct_id) {
report_dropped_events("token_dropper", 1);
false
} else {
true
}
});

tracing::debug!(events=?events, "processed {} events", events.len());

if events.len() == 1 {
Expand Down
1 change: 1 addition & 0 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
dropped_keys: None,
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
Expand Down
2 changes: 2 additions & 0 deletions rust/capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use base64::Engine;
use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode};
use capture::config::CaptureMode;
use capture::limiters::redis::{QuotaResource, RedisLimiter, QUOTA_LIMITER_CACHE_KEY};
use capture::limiters::token_dropper::TokenDropper;
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sinks::Event;
Expand Down Expand Up @@ -117,6 +118,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
sink.clone(),
redis,
billing_limiter,
TokenDropper::default(),
false,
CaptureMode::Events,
None,
Expand Down
56 changes: 56 additions & 0 deletions rust/capture/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,62 @@ async fn it_captures_one_event() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn it_drops_events_if_dropper_enabled() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let dropped_id = random_string("id", 16);

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = main_topic.topic_name().to_string();
config.kafka.kafka_historical_topic = histo_topic.topic_name().to_string();
config.dropped_keys = Some(format!("{}:{}", token, dropped_id));
let server = ServerHandle::for_config(config).await;

let event = json!({
"token": token,
"event": "testing",
"distinct_id": distinct_id
});

let dropped = json!({
"token": token,
"event": "testing",
"distinct_id": dropped_id
});

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());
let res = server.capture_events(dropped.to_string()).await;
assert_eq!(StatusCode::OK, res.status());
let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

let event = main_topic.next_event()?;
assert_json_include!(
actual: event,
expected: json!({
"token": token,
"distinct_id": distinct_id
})
);

// Next event we get is identical to the first, because the dropped event is not captured
let event = main_topic.next_event()?;
assert_json_include!(
actual: event,
expected: json!({
"token": token,
"distinct_id": distinct_id
})
);

Ok(())
}

#[tokio::test]
async fn it_captures_a_posthogjs_array() -> Result<()> {
setup_tracing();
Expand Down

0 comments on commit 8955ed1

Please sign in to comment.