Skip to content

Commit

Permalink
feat: Pull event/property definitions into its own service, so we can…
Browse files Browse the repository at this point in the history
… get it out of the plugin server (#24166)
  • Loading branch information
oliverb123 authored Aug 27, 2024
1 parent d71d24d commit f0d94b0
Show file tree
Hide file tree
Showing 12 changed files with 760 additions and 3 deletions.
1 change: 1 addition & 0 deletions .github/workflows/rust-docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
- hook-worker
- cyclotron-janitor
- cyclotron-fetch
- property-defs-rs
runs-on: depot-ubuntu-22.04-4
permissions:
id-token: write # allow issuing OIDC tokens for this workflow run
Expand Down
36 changes: 34 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
resolver = "2"

members = [
"property-defs-rs",
"capture",
"common/health",
"common/metrics",
Expand Down Expand Up @@ -40,7 +41,7 @@ axum = { version = "0.7.5", features = ["http2", "macros", "matched-path"] }
axum-client-ip = "0.6.0"
base64 = "0.22.0"
bytes = "1"
chrono = { version = "0.4", features = ["default", "serde"]}
chrono = { version = "0.4.38", features = ["default", "serde"] }
envconfig = "0.10.0"
eyre = "0.6.9"
flate2 = "1.0"
Expand Down
13 changes: 13 additions & 0 deletions rust/common/serve_metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "serve-metrics"
version = "0.1.0"
edition = "2021"

[lints]
workspace = true

[dependencies]
axum = { workspace = true }
tokio = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics = { workspace = true }
82 changes: 82 additions & 0 deletions rust/common/serve_metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::time::{Instant, SystemTime};

use axum::{
body::Body, extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse,
routing::get, Router,
};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};

/// Bind a `TcpListener` on the provided bind address to serve a `Router` on it.
/// This function is intended to take a Router as returned by `setup_metrics_router`, potentially with more routes added by the caller.
pub async fn serve(router: Router, bind: &str) -> Result<(), std::io::Error> {
let listener = tokio::net::TcpListener::bind(bind).await?;

axum::serve(listener, router).await?;

Ok(())
}

/// Add the prometheus endpoint and middleware to a router, should be called last.
pub fn setup_metrics_routes(router: Router) -> Router {
let recorder_handle = setup_metrics_recorder();

router
.route(
"/metrics",
get(move || std::future::ready(recorder_handle.render())),
)
.layer(axum::middleware::from_fn(track_metrics))
}

pub fn setup_metrics_recorder() -> PrometheusHandle {
const BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 50.0, 100.0, 250.0,
];

PrometheusBuilder::new()
.set_buckets(BUCKETS)
.unwrap()
.install_recorder()
.unwrap()
}

/// Middleware to record some common HTTP metrics
/// Someday tower-http might provide a metrics middleware: https://github.com/tower-rs/tower-http/issues/57
pub async fn track_metrics(req: Request<Body>, next: Next) -> impl IntoResponse {
let start = Instant::now();

let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
matched_path.as_str().to_owned()
} else {
req.uri().path().to_owned()
};

let method = req.method().clone();

// Run the rest of the request handling first, so we can measure it and get response
// codes.
let response = next.run(req).await;

let latency = start.elapsed().as_secs_f64();
let status = response.status().as_u16().to_string();

let labels = [
("method", method.to_string()),
("path", path),
("status", status),
];

metrics::counter!("http_requests_total", &labels).increment(1);
metrics::histogram!("http_requests_duration_seconds", &labels).record(latency);

response
}

/// Returns the number of seconds since the Unix epoch, to use in prom gauges.
/// Saturates to zero if the system time is set before epoch.
pub fn get_current_timestamp_seconds() -> f64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as f64
}
25 changes: 25 additions & 0 deletions rust/property-defs-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "property-defs-rs"
version = "0.1.0"
edition = "2021"

[dependencies]
uuid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
rdkafka = { workspace = true }
tokio = { workspace = true }
envconfig = {workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
sqlx = { workspace = true }
futures = { workspace = true }
health = { path = "../common/health" }
time = { workspace = true }
axum = { workspace = true }
serve-metrics = { path = "../common/serve_metrics" }
metrics = { workspace = true }
chrono = { workspace = true }

[lints]
workspace = true
36 changes: 36 additions & 0 deletions rust/property-defs-rs/src/app_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::collections::HashSet;

use health::{HealthHandle, HealthRegistry};
use sqlx::{postgres::PgPoolOptions, PgPool};

use crate::{config::Config, metrics_consts::UPDATES_ISSUED, types::Update};

pub struct AppContext {
pub pool: PgPool,
pub liveness: HealthRegistry,
pub worker_liveness: HealthHandle,
}

impl AppContext {
pub async fn new(config: &Config) -> Result<Self, sqlx::Error> {
let options = PgPoolOptions::new().max_connections(config.max_pg_connections);

let pool = options.connect(&config.database_url).await?;

let liveness: HealthRegistry = HealthRegistry::new("liveness");
let worker_liveness = liveness
.register("worker".to_string(), time::Duration::seconds(60))
.await;

Ok(Self {
pool,
liveness,
worker_liveness,
})
}

pub async fn issue(&self, updates: HashSet<Update>) -> Result<(), sqlx::Error> {
metrics::counter!(UPDATES_ISSUED).increment(updates.len() as u64);
Ok(())
}
}
66 changes: 66 additions & 0 deletions rust/property-defs-rs/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use envconfig::Envconfig;
use rdkafka::ClientConfig;

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "postgres://posthog:posthog@localhost:5432/posthog")]
pub database_url: String,

#[envconfig(default = "10")]
pub max_pg_connections: u32,

#[envconfig(nested = true)]
pub kafka: KafkaConfig,

#[envconfig(default = "10")]
pub max_concurrent_transactions: usize,

#[envconfig(default = "10000")]
pub max_batch_size: usize,

// If a worker recieves a batch smaller than this, it will simply not commit the offset and
// sleep for a while, since DB ops/event scales inversely to batch size
#[envconfig(default = "1000")]
pub min_batch_size: usize,

#[envconfig(default = "100")]
pub next_event_wait_timeout_ms: u64,

#[envconfig(from = "BIND_HOST", default = "::")]
pub host: String,

#[envconfig(from = "BIND_PORT", default = "3301")]
pub port: u16,
}

#[derive(Envconfig, Clone)]
pub struct KafkaConfig {
#[envconfig(default = "kafka:9092")]
pub kafka_hosts: String,
#[envconfig(default = "clickhouse_events_json")]
pub event_topic: String,
#[envconfig(default = "false")]
pub kafka_tls: bool,
#[envconfig(default = "false")]
pub verify_ssl_certificate: bool,
#[envconfig(default = "autocomplete-rs")]
pub consumer_group: String,
}

impl From<&KafkaConfig> for ClientConfig {
fn from(config: &KafkaConfig) -> Self {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("group.id", config.consumer_group.clone());

if config.kafka_tls {
client_config.set("security.protocol", "ssl").set(
"enable.ssl.certificate.verification",
config.verify_ssl_certificate.to_string(),
);
};
client_config
}
}
4 changes: 4 additions & 0 deletions rust/property-defs-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod app_context;
pub mod config;
pub mod metrics_consts;
pub mod types;
Loading

0 comments on commit f0d94b0

Please sign in to comment.