0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

fix(err): consumer hangs (#26288)

This commit is contained in:
Oliver Browne 2024-11-19 18:45:25 +02:00 committed by GitHub
parent 53b600d37e
commit e92b38153b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 390 additions and 22 deletions

35
rust/Cargo.lock generated
View File

@ -1061,7 +1061,7 @@ dependencies = [
"opentelemetry-otlp",
"opentelemetry_sdk",
"rand",
"rdkafka",
"rdkafka 0.36.2",
"redis",
"reqwest 0.12.3",
"serde",
@ -1155,7 +1155,7 @@ dependencies = [
"envconfig",
"futures",
"health",
"rdkafka",
"rdkafka 0.36.2",
"serde",
"serde_json",
"thiserror",
@ -1366,7 +1366,7 @@ dependencies = [
"http 1.1.0",
"httpmock",
"rand",
"rdkafka",
"rdkafka 0.36.2",
"reqwest 0.12.3",
"serde",
"serde_json",
@ -1392,7 +1392,7 @@ dependencies = [
"envconfig",
"eyre",
"health",
"rdkafka",
"rdkafka 0.36.2",
"serde_json",
"sqlx",
"time",
@ -1426,16 +1426,16 @@ dependencies = [
"chrono",
"common-alloc",
"common-dns",
"common-kafka",
"common-metrics",
"common-types",
"envconfig",
"futures",
"health",
"httpmock",
"metrics",
"mockall",
"moka",
"rdkafka",
"rdkafka 0.35.0",
"reqwest 0.12.3",
"serde",
"serde_json",
@ -2347,7 +2347,7 @@ dependencies = [
"health",
"hook-common",
"metrics",
"rdkafka",
"rdkafka 0.36.2",
"serde",
"serde_json",
"sqlx",
@ -2375,7 +2375,7 @@ dependencies = [
"http 1.1.0",
"httpmock",
"metrics",
"rdkafka",
"rdkafka 0.36.2",
"reqwest 0.12.3",
"serde_json",
"sqlx",
@ -3841,6 +3841,25 @@ dependencies = [
"bitflags 2.4.2",
]
[[package]]
name = "rdkafka"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f16c17f411935214a5870e40aff9291f8b40a73e97bf8de29e5959c473d5ef33"
dependencies = [
"futures-channel",
"futures-util",
"libc",
"log",
"rdkafka-sys",
"serde",
"serde_derive",
"serde_json",
"slab",
"tokio",
"tracing",
]
[[package]]
name = "rdkafka"
version = "0.36.2"

View File

@ -4,7 +4,18 @@ version = "0.1.0"
edition = "2021"
[dependencies]
rdkafka = { workspace = true }
# We use an older version of rdkafka for cymbal specifically because the newer version
# has a bug which causes consumers to hang when offset auto-storing is disabled and the
# program is producing and consuming at the same time. Cymbal is the only rust service
# that does this, and 0.35 is significantly lower-performance than 0.36, so rather than
# rolling back all other rust services (particularly capture and propdefs), we just have
# cymbal use the older version by overriding the workspace setting. The tracking issue
# for this is: https://github.com/fede1024/rust-rdkafka/issues/638
rdkafka = { version = "0.35.0", features = ["cmake-build", "ssl", "tracing"] }
# We don't rely on common-kafka due to the bug mentioned above
# common-kafka = { path = "../common/kafka" }
# This does force us to take a direct dependency on join_all
futures = { workspace = true }
tokio = { workspace = true }
envconfig = { workspace = true }
tracing = { workspace = true }
@ -14,7 +25,6 @@ axum = { workspace = true }
metrics = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-alloc = { path = "../common/alloc" }
common-kafka = { path = "../common/kafka" }
common-types = { path = "../common/types" }
common-dns = { path = "../common/dns" }
thiserror = { workspace = true }

View File

@ -1,8 +1,4 @@
use aws_config::{BehaviorVersion, Region};
use common_kafka::{
kafka_consumer::SingleTopicConsumer, kafka_producer::create_kafka_producer,
kafka_producer::KafkaContext,
};
use health::{HealthHandle, HealthRegistry};
use rdkafka::producer::FutureProducer;
use sqlx::{postgres::PgPoolOptions, PgPool};
@ -14,6 +10,7 @@ use crate::{
config::Config,
error::UnhandledError,
frames::resolver::Resolver,
hack::kafka::{create_kafka_producer, KafkaContext, SingleTopicConsumer},
symbol_store::{
caching::{Caching, SymbolSetCache},
concurrency,

View File

@ -0,0 +1,33 @@
use std::sync::Arc;
use common_types::ClickHouseEvent;
use cymbal::{
get_props,
hack::kafka::{create_kafka_producer, send_iter_to_kafka, KafkaConfig},
};
use envconfig::Envconfig;
use health::HealthRegistry;
const EXCEPTION_DATA: &str = include_str!("../../tests/static/raw_ch_exception_list.json");
#[tokio::main]
async fn main() {
let config = KafkaConfig::init_from_env().unwrap();
let health = Arc::new(HealthRegistry::new("test"));
let handle = health
.register("rdkafka".to_string(), std::time::Duration::from_secs(30))
.await;
let producer = create_kafka_producer(&config, handle).await.unwrap();
let exception: ClickHouseEvent = serde_json::from_str(EXCEPTION_DATA).unwrap();
let exceptions = (0..10000).map(|_| exception.clone()).collect::<Vec<_>>();
get_props(&exception).unwrap();
loop {
println!("Sending {} exception kafka", exceptions.len());
send_iter_to_kafka(&producer, "exception_symbolification_events", &exceptions)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

7
rust/cymbal/src/bin/run.sh Executable file
View File

@ -0,0 +1,7 @@
export KAFKA_CONSUMER_GROUP="cymbal"
export KAFKA_CONSUMER_TOPIC="exception_symbolification_events"
export OBJECT_STORAGE_BUCKET="posthog"
export OBJECT_STORAGE_ACCESS_KEY_ID="object_storage_root_user"
export OBJECT_STORAGE_SECRET_ACCESS_KEY="object_storage_root_password"
cargo run --bin cymbal

View File

@ -1,6 +1,7 @@
use common_kafka::config::{ConsumerConfig, KafkaConfig};
use envconfig::Envconfig;
use crate::hack::kafka::{ConsumerConfig, KafkaConfig};
#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(from = "BIND_HOST", default = "::")]
@ -70,11 +71,6 @@ pub struct Config {
pub frame_cache_ttl_seconds: u64,
}
pub enum AwsRegion {
USEast1,
USWest1,
}
impl Config {
pub fn init_with_defaults() -> Result<Self, envconfig::Error> {
ConsumerConfig::set_defaults("error-tracking-rs", "exception_symbolification_events");

View File

@ -0,0 +1,302 @@
use envconfig::Envconfig;
use futures::future::join_all;
use health::HealthHandle;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
error::KafkaError,
producer::{FutureProducer, FutureRecord, Producer},
ClientConfig, Message,
};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::error::Error as SerdeError;
use std::sync::{Arc, Weak};
use thiserror::Error;
use tracing::{debug, error, info};
/// This module is a straightforward copy-past of common-kafka, due to the issue described in cymbals Cargo.toml
/// It should be deleted ASAP, pending https://github.com/fede1024/rust-rdkafka/issues/638 being resolved
#[derive(Envconfig, Clone)]
pub struct KafkaConfig {
#[envconfig(default = "20")]
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
#[envconfig(default = "400")]
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "20000")]
pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
#[envconfig(default = "false")]
pub kafka_tls: bool,
#[envconfig(default = "localhost:9092")]
pub kafka_hosts: String,
}
#[derive(Envconfig, Clone)]
pub struct ConsumerConfig {
pub kafka_consumer_group: String,
pub kafka_consumer_topic: String,
}
impl ConsumerConfig {
/// Because the consumer config is application specific, we
/// can't set good defaults in the derive macro, so we expose a way
/// for users to set them here before init'ing their main config struct
pub fn set_defaults(consumer_group: &str, consumer_topic: &str) {
if std::env::var("KAFKA_CONSUMER_GROUP").is_err() {
std::env::set_var("KAFKA_CONSUMER_GROUP", consumer_group);
};
if std::env::var("KAFKA_CONSUMER_TOPIC").is_err() {
std::env::set_var("KAFKA_CONSUMER_TOPIC", consumer_topic);
};
}
}
#[derive(Clone)]
pub struct SingleTopicConsumer {
inner: Arc<Inner>,
}
struct Inner {
consumer: StreamConsumer,
topic: String,
}
#[derive(Debug, thiserror::Error)]
pub enum RecvErr {
#[error("Kafka error: {0}")]
Kafka(#[from] KafkaError),
#[error("Serde error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Received empty payload")]
Empty,
}
#[derive(Debug, thiserror::Error)]
pub enum OffsetErr {
#[error("Kafka error: {0}")]
Kafka(#[from] KafkaError),
#[error("Consumer gone")]
Gone,
}
impl SingleTopicConsumer {
pub fn new(
common_config: KafkaConfig,
consumer_config: ConsumerConfig,
) -> Result<Self, KafkaError> {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &common_config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("group.id", consumer_config.kafka_consumer_group);
client_config.set("enable.auto.offset.store", "false");
if common_config.kafka_tls {
client_config
.set("security.protocol", "ssl")
.set("enable.ssl.certificate.verification", "false");
};
let consumer: StreamConsumer = client_config.create()?;
consumer.subscribe(&[consumer_config.kafka_consumer_topic.as_str()])?;
let inner = Inner {
consumer,
topic: consumer_config.kafka_consumer_topic,
};
Ok(Self {
inner: Arc::new(inner),
})
}
pub async fn json_recv<T>(&self) -> Result<(T, Offset), RecvErr>
where
T: DeserializeOwned,
{
let message = self.inner.consumer.recv().await?;
let offset = Offset {
handle: Arc::downgrade(&self.inner),
partition: message.partition(),
offset: message.offset(),
};
let Some(payload) = message.payload() else {
// We auto-store poison pills, panicking on failure
offset.store().unwrap();
return Err(RecvErr::Empty);
};
let payload = match serde_json::from_slice(payload) {
Ok(p) => p,
Err(e) => {
// We auto-store poison pills, panicking on failure
offset.store().unwrap();
return Err(RecvErr::Serde(e));
}
};
Ok((payload, offset))
}
}
pub struct Offset {
handle: Weak<Inner>,
partition: i32,
offset: i64,
}
impl Offset {
pub fn store(self) -> Result<(), OffsetErr> {
let inner = self.handle.upgrade().ok_or(OffsetErr::Gone)?;
inner
.consumer
.store_offset(&inner.topic, self.partition, self.offset)?;
Ok(())
}
}
pub struct KafkaContext {
liveness: HealthHandle,
}
impl rdkafka::ClientContext for KafkaContext {
fn stats(&self, _: rdkafka::Statistics) {
// Signal liveness, as the main rdkafka loop is running and calling us
self.liveness.report_healthy_blocking();
// TODO: Take stats recording pieces that we want from `capture-rs`.
}
}
pub async fn create_kafka_producer(
config: &KafkaConfig,
liveness: HealthHandle,
) -> Result<FutureProducer<KafkaContext>, KafkaError> {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
.set(
"message.timeout.ms",
config.kafka_message_timeout_ms.to_string(),
)
.set(
"compression.codec",
config.kafka_compression_codec.to_owned(),
)
.set(
"queue.buffering.max.kbytes",
(config.kafka_producer_queue_mib * 1024).to_string(),
);
if config.kafka_tls {
client_config
.set("security.protocol", "ssl")
.set("enable.ssl.certificate.verification", "false");
};
debug!("rdkafka configuration: {:?}", client_config);
let api: FutureProducer<KafkaContext> =
client_config.create_with_context(KafkaContext { liveness })?;
// "Ping" the Kafka brokers by requesting metadata
match api
.client()
.fetch_metadata(None, std::time::Duration::from_secs(15))
{
Ok(metadata) => {
info!(
"Successfully connected to Kafka brokers. Found {} topics.",
metadata.topics().len()
);
}
Err(error) => {
error!("Failed to fetch metadata from Kafka brokers: {:?}", error);
return Err(error);
}
}
Ok(api)
}
#[derive(Error, Debug)]
pub enum KafkaProduceError {
#[error("failed to serialize: {error}")]
SerializationError { error: SerdeError },
#[error("failed to produce to kafka: {error}")]
KafkaProduceError { error: KafkaError },
#[error("failed to produce to kafka (timeout)")]
KafkaProduceCanceled,
}
pub async fn send_iter_to_kafka<T>(
kafka_producer: &FutureProducer<KafkaContext>,
topic: &str,
iter: impl IntoIterator<Item = T>,
) -> Result<(), KafkaProduceError>
where
T: Serialize,
{
send_keyed_iter_to_kafka(kafka_producer, topic, |_| None, iter).await
}
pub async fn send_keyed_iter_to_kafka<T>(
kafka_producer: &FutureProducer<KafkaContext>,
topic: &str,
key_extractor: impl Fn(&T) -> Option<String>,
iter: impl IntoIterator<Item = T>,
) -> Result<(), KafkaProduceError>
where
T: Serialize,
{
let mut payloads = Vec::new();
for i in iter {
let key = key_extractor(&i);
let payload = serde_json::to_string(&i)
.map_err(|e| KafkaProduceError::SerializationError { error: e })?;
payloads.push((key, payload));
}
if payloads.is_empty() {
return Ok(());
}
let mut delivery_futures = Vec::new();
for (key, payload) in payloads {
match kafka_producer.send_result(FutureRecord {
topic,
payload: Some(&payload),
partition: None,
key: key.as_deref(),
timestamp: None,
headers: None,
}) {
Ok(future) => delivery_futures.push(future),
Err((error, _)) => return Err(KafkaProduceError::KafkaProduceError { error }),
}
}
for result in join_all(delivery_futures).await {
match result {
Ok(Ok(_)) => {}
Ok(Err((error, _))) => return Err(KafkaProduceError::KafkaProduceError { error }),
Err(_) => {
// Cancelled due to timeout while retrying
return Err(KafkaProduceError::KafkaProduceCanceled);
}
}
}
Ok(())
}

View File

@ -0,0 +1,3 @@
pub mod kafka;
// See comment in Cargo.toml, this module exists because of bugs we're working around

View File

@ -13,6 +13,7 @@ pub mod config;
pub mod error;
pub mod fingerprinting;
pub mod frames;
pub mod hack;
pub mod issue_resolution;
pub mod langs;
pub mod metric_consts;
@ -59,7 +60,7 @@ pub async fn handle_event(
Ok(event)
}
fn get_props(event: &ClickHouseEvent) -> Result<RawErrProps, EventError> {
pub fn get_props(event: &ClickHouseEvent) -> Result<RawErrProps, EventError> {
if event.event != "$exception" {
return Err(EventError::WrongEventType(event.event.clone(), event.uuid));
}

View File

@ -1,12 +1,12 @@
use std::{future::ready, sync::Arc};
use axum::{routing::get, Router};
use common_kafka::{kafka_consumer::RecvErr, kafka_producer::send_keyed_iter_to_kafka};
use common_metrics::{serve, setup_metrics_routes};
use common_types::ClickHouseEvent;
use cymbal::{
app_context::AppContext,
config::Config,
hack::kafka::{send_keyed_iter_to_kafka, RecvErr},
handle_event,
metric_consts::{ERRORS, EVENT_RECEIVED, MAIN_LOOP_TIME, STACK_PROCESSED},
};