use std::num::NonZeroU32; use time::Duration; use crate::common::*; use anyhow::Result; use assert_json_diff::assert_json_include; use capture::limiters::redis::QuotaResource; use reqwest::StatusCode; use serde_json::json; use uuid::Uuid; mod common; #[tokio::test] async fn it_captures_one_event() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id = random_string("id", 16); let main_topic = EphemeralTopic::new().await; let histo_topic = EphemeralTopic::new().await; let server = ServerHandle::for_topics(&main_topic, &histo_topic).await; let event = json!({ "token": token, "event": "testing", "distinct_id": distinct_id }); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); let event = main_topic.next_event()?; assert_json_include!( actual: event, expected: json!({ "token": token, "distinct_id": distinct_id }) ); Ok(()) } #[tokio::test] async fn it_captures_a_posthogjs_array() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id1 = random_string("id", 16); let distinct_id2 = random_string("id", 16); let main_topic = EphemeralTopic::new().await; let histo_topic = EphemeralTopic::new().await; let server = ServerHandle::for_topics(&main_topic, &histo_topic).await; let event = json!([{ "token": token, "event": "event1", "distinct_id": distinct_id1 },{ "token": token, "event": "event2", "distinct_id": distinct_id2 }]); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "distinct_id": distinct_id1 }) ); assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "distinct_id": distinct_id2 }) ); Ok(()) } #[tokio::test] async fn it_captures_a_batch() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id1 = random_string("id", 16); let distinct_id2 = random_string("id", 16); let main_topic = EphemeralTopic::new().await; let histo_topic = EphemeralTopic::new().await; let server = ServerHandle::for_topics(&main_topic, &histo_topic).await; let event = json!({ "token": token, "batch": [{ "event": "event1", "distinct_id": distinct_id1 },{ "event": "event2", "distinct_id": distinct_id2 }] }); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "distinct_id": distinct_id1 }) ); assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "distinct_id": distinct_id2 }) ); Ok(()) } #[tokio::test] async fn it_captures_a_historical_batch() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id1 = random_string("id", 16); let distinct_id2 = random_string("id", 16); let main_topic = EphemeralTopic::new().await; let histo_topic = EphemeralTopic::new().await; let server = ServerHandle::for_topics(&main_topic, &histo_topic).await; let event = json!({ "token": token, "historical_migration": true, "batch": [{ "event": "event1", "distinct_id": distinct_id1 },{ "event": "event2", "distinct_id": distinct_id2 }] }); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); assert_json_include!( actual: histo_topic.next_event()?, expected: json!({ "token": token, "distinct_id": distinct_id1 }) ); assert_json_include!( actual: histo_topic.next_event()?, expected: json!({ "token": token, "distinct_id": distinct_id2 }) ); Ok(()) } #[tokio::test] async fn it_overflows_events_on_burst() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id = random_string("id", 16); let topic = EphemeralTopic::new().await; let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); let server = ServerHandle::for_config(config).await; let event = json!([{ "token": token, "event": "event1", "distinct_id": distinct_id },{ "token": token, "event": "event2", "distinct_id": distinct_id },{ "token": token, "event": "event3", "distinct_id": distinct_id }]); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); assert_eq!( topic.next_message_key()?.unwrap(), format!("{}:{}", token, distinct_id) ); assert_eq!( topic.next_message_key()?.unwrap(), format!("{}:{}", token, distinct_id) ); assert_eq!(topic.next_message_key()?, None); Ok(()) } #[tokio::test] async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id = random_string("id", 16); let distinct_id2 = random_string("id", 16); let topic = EphemeralTopic::new().await; let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(1).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); let server = ServerHandle::for_config(config).await; let event = json!([{ "token": token, "event": "event1", "distinct_id": distinct_id },{ "token": token, "event": "event2", "distinct_id": distinct_id2 }]); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); assert_eq!( topic.next_message_key()?.unwrap(), format!("{}:{}", token, distinct_id) ); assert_eq!( topic.next_message_key()?.unwrap(), format!("{}:{}", token, distinct_id2) ); Ok(()) } #[tokio::test] async fn it_skips_overflows_when_disabled() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id = random_string("id", 16); let topic = EphemeralTopic::new().await; let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); config.overflow_enabled = false; config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); let server = ServerHandle::for_config(config).await; let event = json!([{ "token": token, "event": "event1", "distinct_id": distinct_id },{ "token": token, "event": "event2", "distinct_id": distinct_id },{ "token": token, "event": "event3", "distinct_id": distinct_id }]); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); assert_eq!( topic.next_message_key()?.unwrap(), format!("{}:{}", token, distinct_id) ); assert_eq!( topic.next_message_key()?.unwrap(), format!("{}:{}", token, distinct_id) ); // Should have triggered overflow, but has not assert_eq!( topic.next_message_key()?.unwrap(), format!("{}:{}", token, distinct_id) ); Ok(()) } #[tokio::test] async fn it_trims_distinct_id() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id1 = random_string("id", 200 - 3); let distinct_id2 = random_string("id", 222); let (trimmed_distinct_id2, _) = distinct_id2.split_at(200); // works because ascii chars let main_topic = EphemeralTopic::new().await; let histo_topic = EphemeralTopic::new().await; let server = ServerHandle::for_topics(&main_topic, &histo_topic).await; let event = json!([{ "token": token, "event": "event1", "distinct_id": distinct_id1 },{ "token": token, "event": "event2", "distinct_id": distinct_id2 }]); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "distinct_id": distinct_id1 }) ); assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "distinct_id": trimmed_distinct_id2 }) ); Ok(()) } #[tokio::test] async fn it_applies_billing_limits() -> Result<()> { setup_tracing(); let token1 = random_string("token", 16); let token2 = random_string("token", 16); let token3 = random_string("token", 16); let distinct_id = random_string("id", 16); let topic = EphemeralTopic::new().await; // Setup billing limits: // - token1 limit is expired -> accept messages // - token2 limit is active -> drop messages // - token3 is not in redis -> accept by default let redis = PrefixedRedis::new().await; redis.add_billing_limit(QuotaResource::Events, &token1, Duration::seconds(-60)); redis.add_billing_limit(QuotaResource::Events, &token2, Duration::seconds(60)); let mut config = DEFAULT_CONFIG.clone(); config.redis_key_prefix = redis.key_prefix(); config.kafka.kafka_topic = topic.topic_name().to_string(); let server = ServerHandle::for_config(config).await; for payload in [ json!({ "token": token1, "batch": [{"event": "event1","distinct_id": distinct_id}] }), json!({ "token": token2, "batch": [{"event": "to drop","distinct_id": distinct_id}] }), json!({ "token": token3, "batch": [{"event": "event1","distinct_id": distinct_id}] }), ] { let res = server.capture_events(payload.to_string()).await; assert_eq!(StatusCode::OK, res.status()); } // Batches 1 and 3 go through, batch 2 is dropped assert_json_include!( actual: topic.next_event()?, expected: json!({ "token": token1, "distinct_id": distinct_id }) ); assert_json_include!( actual: topic.next_event()?, expected: json!({ "token": token3, "distinct_id": distinct_id }) ); Ok(()) } #[tokio::test] async fn it_routes_exceptions_and_heapmaps_to_separate_topics() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id = random_string("id", 16); let uuids: [Uuid; 5] = core::array::from_fn(|_| Uuid::now_v7()); let main_topic = EphemeralTopic::new().await; let warnings_topic = EphemeralTopic::new().await; let exceptions_topic = EphemeralTopic::new().await; let heatmaps_topic = EphemeralTopic::new().await; let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = main_topic.topic_name().to_string(); config.kafka.kafka_client_ingestion_warning_topic = warnings_topic.topic_name().to_string(); config.kafka.kafka_exceptions_topic = exceptions_topic.topic_name().to_string(); config.kafka.kafka_heatmaps_topic = heatmaps_topic.topic_name().to_string(); let server = ServerHandle::for_config(config).await; let event = json!([{ "token": token, "event": "$$client_ingestion_warning", "uuid": uuids[4], "distinct_id": distinct_id },{ "token": token, "event": "event1", "uuid": uuids[0], "distinct_id": distinct_id },{ "token": token, "event": "$$heatmap", "uuid": uuids[1], "distinct_id": distinct_id },{ "token": token, "event": "$exception", "uuid": uuids[2], "distinct_id": distinct_id },{ "token": token, "event": "event2", "uuid": uuids[3], "distinct_id": distinct_id }]); let res = server.capture_events(event.to_string()).await; assert_eq!(StatusCode::OK, res.status()); // Regular events are pushed to the main analytics topic assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "uuid": uuids[0], "distinct_id": distinct_id }) ); assert_json_include!( actual: main_topic.next_event()?, expected: json!({ "token": token, "uuid": uuids[3], "distinct_id": distinct_id }) ); main_topic.assert_empty(); // Special-cased events are pushed to their own topics assert_json_include!( actual: exceptions_topic.next_event()?, expected: json!({ "token": token, "uuid": uuids[2], "distinct_id": distinct_id }) ); exceptions_topic.assert_empty(); assert_json_include!( actual: heatmaps_topic.next_event()?, expected: json!({ "token": token, "uuid": uuids[1], "distinct_id": distinct_id }) ); heatmaps_topic.assert_empty(); assert_json_include!( actual: warnings_topic.next_event()?, expected: json!({ "token": token, "uuid": uuids[4], "distinct_id": distinct_id }) ); warnings_topic.assert_empty(); Ok(()) } #[tokio::test] async fn it_limits_non_batch_endpoints_to_2mb() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id = random_string("id", 16); let main_topic = EphemeralTopic::new().await; let histo_topic = EphemeralTopic::new().await; let server = ServerHandle::for_topics(&main_topic, &histo_topic).await; let ok_event = json!({ "token": token, "event": "event1", "distinct_id": distinct_id, "properties": { "big": "a".repeat(2_000_000) } }); let nok_event = json!({ "token": token, "event": "event2", "distinct_id": distinct_id, "properties": { "big": "a".repeat(2_100_000) } }); let res = server.capture_events(ok_event.to_string()).await; // The events are too large to go in kafka, so we get a maximum event size exceeded error, but that's ok, because that's a 400, not a 413 assert_eq!(StatusCode::BAD_REQUEST, res.status()); let res = server.capture_events(nok_event.to_string()).await; assert_eq!(StatusCode::PAYLOAD_TOO_LARGE, res.status()); Ok(()) } #[tokio::test] async fn it_limits_batch_endpoints_to_20mb() -> Result<()> { setup_tracing(); let token = random_string("token", 16); let distinct_id = random_string("id", 16); let main_topic = EphemeralTopic::new().await; let histo_topic = EphemeralTopic::new().await; let server = ServerHandle::for_topics(&main_topic, &histo_topic).await; // Notably here, rust capture actually handles all endpoints with the same function, so we don't actually // need to wrap these events in an array to send them to our batch endpoint let ok_event = json!({ "token": token, "event": "event1", "distinct_id": distinct_id, "properties": { "big": "a".repeat(20_000_000) } }); let nok_event = json!({ "token": token, "event": "event2", "distinct_id": distinct_id, "properties": { "big": "a".repeat(21_000_000) } }); let res = server.capture_to_batch(ok_event.to_string()).await; // The events are too large to go in kafka, so we get a maximum event size exceeded error, but that's ok, because that's a 400, not a 413 assert_eq!(StatusCode::BAD_REQUEST, res.status()); let res = server.capture_to_batch(nok_event.to_string()).await; assert_eq!(StatusCode::PAYLOAD_TOO_LARGE, res.status()); Ok(()) }