diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 9a263a87878..2d54adc784b 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1061,7 +1061,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "rand", - "rdkafka", + "rdkafka 0.36.2", "redis", "reqwest 0.12.3", "serde", @@ -1155,7 +1155,7 @@ dependencies = [ "envconfig", "futures", "health", - "rdkafka", + "rdkafka 0.36.2", "serde", "serde_json", "thiserror", @@ -1366,7 +1366,7 @@ dependencies = [ "http 1.1.0", "httpmock", "rand", - "rdkafka", + "rdkafka 0.36.2", "reqwest 0.12.3", "serde", "serde_json", @@ -1392,7 +1392,7 @@ dependencies = [ "envconfig", "eyre", "health", - "rdkafka", + "rdkafka 0.36.2", "serde_json", "sqlx", "time", @@ -1426,16 +1426,16 @@ dependencies = [ "chrono", "common-alloc", "common-dns", - "common-kafka", "common-metrics", "common-types", "envconfig", + "futures", "health", "httpmock", "metrics", "mockall", "moka", - "rdkafka", + "rdkafka 0.35.0", "reqwest 0.12.3", "serde", "serde_json", @@ -2347,7 +2347,7 @@ dependencies = [ "health", "hook-common", "metrics", - "rdkafka", + "rdkafka 0.36.2", "serde", "serde_json", "sqlx", @@ -2375,7 +2375,7 @@ dependencies = [ "http 1.1.0", "httpmock", "metrics", - "rdkafka", + "rdkafka 0.36.2", "reqwest 0.12.3", "serde_json", "sqlx", @@ -3841,6 +3841,25 @@ dependencies = [ "bitflags 2.4.2", ] +[[package]] +name = "rdkafka" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16c17f411935214a5870e40aff9291f8b40a73e97bf8de29e5959c473d5ef33" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", + "tracing", +] + [[package]] name = "rdkafka" version = "0.36.2" diff --git a/rust/cymbal/Cargo.toml b/rust/cymbal/Cargo.toml index 8e54964fae1..84d25c015f7 100644 --- a/rust/cymbal/Cargo.toml +++ b/rust/cymbal/Cargo.toml @@ -4,7 +4,18 @@ version = "0.1.0" edition = "2021" [dependencies] -rdkafka = { workspace = true } +# We use an older version of rdkafka for cymbal specifically because the newer version +# has a bug which causes consumers to hang when offset auto-storing is disabled and the +# program is producing and consuming at the same time. Cymbal is the only rust service +# that does this, and 0.35 is significantly lower-performance than 0.36, so rather than +# rolling back all other rust services (particularly capture and propdefs), we just have +# cymbal use the older version by overriding the workspace setting. The tracking issue +# for this is: https://github.com/fede1024/rust-rdkafka/issues/638 +rdkafka = { version = "0.35.0", features = ["cmake-build", "ssl", "tracing"] } +# We don't rely on common-kafka due to the bug mentioned above +# common-kafka = { path = "../common/kafka" } +# This does force us to take a direct dependency on join_all +futures = { workspace = true } tokio = { workspace = true } envconfig = { workspace = true } tracing = { workspace = true } @@ -14,7 +25,6 @@ axum = { workspace = true } metrics = { workspace = true } common-metrics = { path = "../common/metrics" } common-alloc = { path = "../common/alloc" } -common-kafka = { path = "../common/kafka" } common-types = { path = "../common/types" } common-dns = { path = "../common/dns" } thiserror = { workspace = true } diff --git a/rust/cymbal/src/app_context.rs b/rust/cymbal/src/app_context.rs index a57a0cb1320..fc489b21de7 100644 --- a/rust/cymbal/src/app_context.rs +++ b/rust/cymbal/src/app_context.rs @@ -1,8 +1,4 @@ use aws_config::{BehaviorVersion, Region}; -use common_kafka::{ - kafka_consumer::SingleTopicConsumer, kafka_producer::create_kafka_producer, - kafka_producer::KafkaContext, -}; use health::{HealthHandle, HealthRegistry}; use rdkafka::producer::FutureProducer; use sqlx::{postgres::PgPoolOptions, PgPool}; @@ -14,6 +10,7 @@ use crate::{ config::Config, error::UnhandledError, frames::resolver::Resolver, + hack::kafka::{create_kafka_producer, KafkaContext, SingleTopicConsumer}, symbol_store::{ caching::{Caching, SymbolSetCache}, concurrency, diff --git a/rust/cymbal/src/bin/generate_test_events.rs b/rust/cymbal/src/bin/generate_test_events.rs new file mode 100644 index 00000000000..2e237a056d7 --- /dev/null +++ b/rust/cymbal/src/bin/generate_test_events.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use common_types::ClickHouseEvent; +use cymbal::{ + get_props, + hack::kafka::{create_kafka_producer, send_iter_to_kafka, KafkaConfig}, +}; +use envconfig::Envconfig; +use health::HealthRegistry; + +const EXCEPTION_DATA: &str = include_str!("../../tests/static/raw_ch_exception_list.json"); + +#[tokio::main] +async fn main() { + let config = KafkaConfig::init_from_env().unwrap(); + let health = Arc::new(HealthRegistry::new("test")); + let handle = health + .register("rdkafka".to_string(), std::time::Duration::from_secs(30)) + .await; + let producer = create_kafka_producer(&config, handle).await.unwrap(); + + let exception: ClickHouseEvent = serde_json::from_str(EXCEPTION_DATA).unwrap(); + let exceptions = (0..10000).map(|_| exception.clone()).collect::>(); + get_props(&exception).unwrap(); + + loop { + println!("Sending {} exception kafka", exceptions.len()); + send_iter_to_kafka(&producer, "exception_symbolification_events", &exceptions) + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } +} diff --git a/rust/cymbal/src/bin/run.sh b/rust/cymbal/src/bin/run.sh new file mode 100755 index 00000000000..694c0f9e5d8 --- /dev/null +++ b/rust/cymbal/src/bin/run.sh @@ -0,0 +1,7 @@ +export KAFKA_CONSUMER_GROUP="cymbal" +export KAFKA_CONSUMER_TOPIC="exception_symbolification_events" +export OBJECT_STORAGE_BUCKET="posthog" +export OBJECT_STORAGE_ACCESS_KEY_ID="object_storage_root_user" +export OBJECT_STORAGE_SECRET_ACCESS_KEY="object_storage_root_password" + +cargo run --bin cymbal diff --git a/rust/cymbal/src/config.rs b/rust/cymbal/src/config.rs index a6323556b7e..3bd5ae0f4c3 100644 --- a/rust/cymbal/src/config.rs +++ b/rust/cymbal/src/config.rs @@ -1,6 +1,7 @@ -use common_kafka::config::{ConsumerConfig, KafkaConfig}; use envconfig::Envconfig; +use crate::hack::kafka::{ConsumerConfig, KafkaConfig}; + #[derive(Envconfig, Clone)] pub struct Config { #[envconfig(from = "BIND_HOST", default = "::")] @@ -70,11 +71,6 @@ pub struct Config { pub frame_cache_ttl_seconds: u64, } -pub enum AwsRegion { - USEast1, - USWest1, -} - impl Config { pub fn init_with_defaults() -> Result { ConsumerConfig::set_defaults("error-tracking-rs", "exception_symbolification_events"); diff --git a/rust/cymbal/src/hack/kafka.rs b/rust/cymbal/src/hack/kafka.rs new file mode 100644 index 00000000000..c57f27d80b0 --- /dev/null +++ b/rust/cymbal/src/hack/kafka.rs @@ -0,0 +1,302 @@ +use envconfig::Envconfig; +use futures::future::join_all; +use health::HealthHandle; +use rdkafka::{ + consumer::{Consumer, StreamConsumer}, + error::KafkaError, + producer::{FutureProducer, FutureRecord, Producer}, + ClientConfig, Message, +}; +use serde::{de::DeserializeOwned, Serialize}; +use serde_json::error::Error as SerdeError; +use std::sync::{Arc, Weak}; +use thiserror::Error; +use tracing::{debug, error, info}; + +/// This module is a straightforward copy-past of common-kafka, due to the issue described in cymbals Cargo.toml +/// It should be deleted ASAP, pending https://github.com/fede1024/rust-rdkafka/issues/638 being resolved + +#[derive(Envconfig, Clone)] +pub struct KafkaConfig { + #[envconfig(default = "20")] + pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic + + #[envconfig(default = "400")] + pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes + + #[envconfig(default = "20000")] + pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds + + #[envconfig(default = "none")] + pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd + + #[envconfig(default = "false")] + pub kafka_tls: bool, + + #[envconfig(default = "localhost:9092")] + pub kafka_hosts: String, +} + +#[derive(Envconfig, Clone)] +pub struct ConsumerConfig { + pub kafka_consumer_group: String, + pub kafka_consumer_topic: String, +} + +impl ConsumerConfig { + /// Because the consumer config is application specific, we + /// can't set good defaults in the derive macro, so we expose a way + /// for users to set them here before init'ing their main config struct + pub fn set_defaults(consumer_group: &str, consumer_topic: &str) { + if std::env::var("KAFKA_CONSUMER_GROUP").is_err() { + std::env::set_var("KAFKA_CONSUMER_GROUP", consumer_group); + }; + if std::env::var("KAFKA_CONSUMER_TOPIC").is_err() { + std::env::set_var("KAFKA_CONSUMER_TOPIC", consumer_topic); + }; + } +} + +#[derive(Clone)] +pub struct SingleTopicConsumer { + inner: Arc, +} + +struct Inner { + consumer: StreamConsumer, + topic: String, +} + +#[derive(Debug, thiserror::Error)] +pub enum RecvErr { + #[error("Kafka error: {0}")] + Kafka(#[from] KafkaError), + #[error("Serde error: {0}")] + Serde(#[from] serde_json::Error), + #[error("Received empty payload")] + Empty, +} + +#[derive(Debug, thiserror::Error)] +pub enum OffsetErr { + #[error("Kafka error: {0}")] + Kafka(#[from] KafkaError), + #[error("Consumer gone")] + Gone, +} + +impl SingleTopicConsumer { + pub fn new( + common_config: KafkaConfig, + consumer_config: ConsumerConfig, + ) -> Result { + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &common_config.kafka_hosts) + .set("statistics.interval.ms", "10000") + .set("group.id", consumer_config.kafka_consumer_group); + + client_config.set("enable.auto.offset.store", "false"); + + if common_config.kafka_tls { + client_config + .set("security.protocol", "ssl") + .set("enable.ssl.certificate.verification", "false"); + }; + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[consumer_config.kafka_consumer_topic.as_str()])?; + + let inner = Inner { + consumer, + topic: consumer_config.kafka_consumer_topic, + }; + Ok(Self { + inner: Arc::new(inner), + }) + } + + pub async fn json_recv(&self) -> Result<(T, Offset), RecvErr> + where + T: DeserializeOwned, + { + let message = self.inner.consumer.recv().await?; + + let offset = Offset { + handle: Arc::downgrade(&self.inner), + partition: message.partition(), + offset: message.offset(), + }; + + let Some(payload) = message.payload() else { + // We auto-store poison pills, panicking on failure + offset.store().unwrap(); + return Err(RecvErr::Empty); + }; + + let payload = match serde_json::from_slice(payload) { + Ok(p) => p, + Err(e) => { + // We auto-store poison pills, panicking on failure + offset.store().unwrap(); + return Err(RecvErr::Serde(e)); + } + }; + + Ok((payload, offset)) + } +} + +pub struct Offset { + handle: Weak, + partition: i32, + offset: i64, +} + +impl Offset { + pub fn store(self) -> Result<(), OffsetErr> { + let inner = self.handle.upgrade().ok_or(OffsetErr::Gone)?; + inner + .consumer + .store_offset(&inner.topic, self.partition, self.offset)?; + Ok(()) + } +} + +pub struct KafkaContext { + liveness: HealthHandle, +} + +impl rdkafka::ClientContext for KafkaContext { + fn stats(&self, _: rdkafka::Statistics) { + // Signal liveness, as the main rdkafka loop is running and calling us + self.liveness.report_healthy_blocking(); + + // TODO: Take stats recording pieces that we want from `capture-rs`. + } +} + +pub async fn create_kafka_producer( + config: &KafkaConfig, + liveness: HealthHandle, +) -> Result, KafkaError> { + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &config.kafka_hosts) + .set("statistics.interval.ms", "10000") + .set("linger.ms", config.kafka_producer_linger_ms.to_string()) + .set( + "message.timeout.ms", + config.kafka_message_timeout_ms.to_string(), + ) + .set( + "compression.codec", + config.kafka_compression_codec.to_owned(), + ) + .set( + "queue.buffering.max.kbytes", + (config.kafka_producer_queue_mib * 1024).to_string(), + ); + + if config.kafka_tls { + client_config + .set("security.protocol", "ssl") + .set("enable.ssl.certificate.verification", "false"); + }; + + debug!("rdkafka configuration: {:?}", client_config); + let api: FutureProducer = + client_config.create_with_context(KafkaContext { liveness })?; + + // "Ping" the Kafka brokers by requesting metadata + match api + .client() + .fetch_metadata(None, std::time::Duration::from_secs(15)) + { + Ok(metadata) => { + info!( + "Successfully connected to Kafka brokers. Found {} topics.", + metadata.topics().len() + ); + } + Err(error) => { + error!("Failed to fetch metadata from Kafka brokers: {:?}", error); + return Err(error); + } + } + + Ok(api) +} + +#[derive(Error, Debug)] +pub enum KafkaProduceError { + #[error("failed to serialize: {error}")] + SerializationError { error: SerdeError }, + #[error("failed to produce to kafka: {error}")] + KafkaProduceError { error: KafkaError }, + #[error("failed to produce to kafka (timeout)")] + KafkaProduceCanceled, +} + +pub async fn send_iter_to_kafka( + kafka_producer: &FutureProducer, + topic: &str, + iter: impl IntoIterator, +) -> Result<(), KafkaProduceError> +where + T: Serialize, +{ + send_keyed_iter_to_kafka(kafka_producer, topic, |_| None, iter).await +} + +pub async fn send_keyed_iter_to_kafka( + kafka_producer: &FutureProducer, + topic: &str, + key_extractor: impl Fn(&T) -> Option, + iter: impl IntoIterator, +) -> Result<(), KafkaProduceError> +where + T: Serialize, +{ + let mut payloads = Vec::new(); + + for i in iter { + let key = key_extractor(&i); + let payload = serde_json::to_string(&i) + .map_err(|e| KafkaProduceError::SerializationError { error: e })?; + payloads.push((key, payload)); + } + + if payloads.is_empty() { + return Ok(()); + } + + let mut delivery_futures = Vec::new(); + + for (key, payload) in payloads { + match kafka_producer.send_result(FutureRecord { + topic, + payload: Some(&payload), + partition: None, + key: key.as_deref(), + timestamp: None, + headers: None, + }) { + Ok(future) => delivery_futures.push(future), + Err((error, _)) => return Err(KafkaProduceError::KafkaProduceError { error }), + } + } + + for result in join_all(delivery_futures).await { + match result { + Ok(Ok(_)) => {} + Ok(Err((error, _))) => return Err(KafkaProduceError::KafkaProduceError { error }), + Err(_) => { + // Cancelled due to timeout while retrying + return Err(KafkaProduceError::KafkaProduceCanceled); + } + } + } + + Ok(()) +} diff --git a/rust/cymbal/src/hack/mod.rs b/rust/cymbal/src/hack/mod.rs new file mode 100644 index 00000000000..56004115789 --- /dev/null +++ b/rust/cymbal/src/hack/mod.rs @@ -0,0 +1,3 @@ +pub mod kafka; + +// See comment in Cargo.toml, this module exists because of bugs we're working around diff --git a/rust/cymbal/src/lib.rs b/rust/cymbal/src/lib.rs index 471df70c0fe..d5be47d4f8f 100644 --- a/rust/cymbal/src/lib.rs +++ b/rust/cymbal/src/lib.rs @@ -13,6 +13,7 @@ pub mod config; pub mod error; pub mod fingerprinting; pub mod frames; +pub mod hack; pub mod issue_resolution; pub mod langs; pub mod metric_consts; @@ -59,7 +60,7 @@ pub async fn handle_event( Ok(event) } -fn get_props(event: &ClickHouseEvent) -> Result { +pub fn get_props(event: &ClickHouseEvent) -> Result { if event.event != "$exception" { return Err(EventError::WrongEventType(event.event.clone(), event.uuid)); } diff --git a/rust/cymbal/src/main.rs b/rust/cymbal/src/main.rs index 62a479ae042..aeef8632946 100644 --- a/rust/cymbal/src/main.rs +++ b/rust/cymbal/src/main.rs @@ -1,12 +1,12 @@ use std::{future::ready, sync::Arc}; use axum::{routing::get, Router}; -use common_kafka::{kafka_consumer::RecvErr, kafka_producer::send_keyed_iter_to_kafka}; use common_metrics::{serve, setup_metrics_routes}; use common_types::ClickHouseEvent; use cymbal::{ app_context::AppContext, config::Config, + hack::kafka::{send_keyed_iter_to_kafka, RecvErr}, handle_event, metric_consts::{ERRORS, EVENT_RECEIVED, MAIN_LOOP_TIME, STACK_PROCESSED}, };