From afeb5529ceb67cbe78af7c0d05999353e205bed5 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Mon, 18 Nov 2024 14:12:48 +0200 Subject: [PATCH] fix(err): assorted perf/concurrency things (#26235) --- rust/cymbal/src/app_context.rs | 8 ++- rust/cymbal/src/lib.rs | 55 +++++++------- rust/cymbal/src/main.rs | 2 +- rust/cymbal/src/symbol_store/caching.rs | 24 +++++-- rust/cymbal/src/symbol_store/concurrency.rs | 79 +++++++++++++++++++++ rust/cymbal/src/symbol_store/mod.rs | 1 + rust/cymbal/src/symbol_store/saving.rs | 4 ++ 7 files changed, 140 insertions(+), 33 deletions(-) create mode 100644 rust/cymbal/src/symbol_store/concurrency.rs diff --git a/rust/cymbal/src/app_context.rs b/rust/cymbal/src/app_context.rs index dc0c513f332..ef96c84fa7a 100644 --- a/rust/cymbal/src/app_context.rs +++ b/rust/cymbal/src/app_context.rs @@ -16,6 +16,7 @@ use crate::{ frames::resolver::Resolver, symbol_store::{ caching::{Caching, SymbolSetCache}, + concurrency, saving::Saving, sourcemap::SourcemapProvider, Catalog, S3Client, @@ -80,13 +81,18 @@ impl AppContext { config.ss_prefix.clone(), ); let caching_smp = Caching::new(saving_smp, ss_cache); + // We want to fetch each sourcemap from the outside world + // exactly once, and if it isn't in the cache, load/parse + // it from s3 exactly once too. Limiting the per symbol set + // reference concurreny to 1 ensures this. + let limited_smp = concurrency::AtMostOne::new(caching_smp); info!( "AppContext initialized, subscribed to topic {}", config.consumer.kafka_consumer_topic ); - let catalog = Catalog::new(caching_smp); + let catalog = Catalog::new(limited_smp); let resolver = Resolver::new(config); Ok(Self { diff --git a/rust/cymbal/src/lib.rs b/rust/cymbal/src/lib.rs index 35386be8cca..09ea0a73f49 100644 --- a/rust/cymbal/src/lib.rs +++ b/rust/cymbal/src/lib.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::sync::Arc; use app_context::AppContext; use common_types::ClickHouseEvent; @@ -20,7 +20,7 @@ pub mod symbol_store; pub mod types; pub async fn handle_event( - context: &AppContext, + context: Arc, mut event: ClickHouseEvent, ) -> Result, UnhandledError> { let mut props = match get_props(&event) { @@ -45,7 +45,7 @@ pub async fn handle_event( // If we get an unhandled error during exception processing, we return an error, which should // cause the caller to drop the offset without storing it - unhandled exceptions indicate // a dependency is down, or some bug, adn we want to take lag in those situations. - results.push(process_exception(context, event.team_id, exception).await?); + results.push(process_exception(context.clone(), event.team_id, exception).await?); } let fingerprint = generate_fingerprint(&results); @@ -79,7 +79,7 @@ fn get_props(event: &ClickHouseEvent) -> Result { } async fn process_exception( - context: &AppContext, + context: Arc, team_id: i32, mut e: Exception, ) -> Result { @@ -96,40 +96,43 @@ async fn process_exception( return Ok(e); } - let mut results = Vec::with_capacity(frames.len()); + let mut handles = Vec::with_capacity(frames.len()); + let mut resolved_frames = Vec::with_capacity(frames.len()); - // Cluster the frames by symbol set - // TODO - we really want to cluster across exceptions (and even across events), - // rather than just within a single exception - let mut groups = HashMap::new(); - for (i, frame) in frames.into_iter().enumerate() { - let group = groups - .entry(frame.symbol_set_ref()) - .or_insert_with(Vec::new); - group.push((i, frame)); - } - - for (_, frames) in groups.into_iter() { - for (i, frame) in frames { - let resolved_frame = context + for frame in frames.into_iter() { + let context = context.clone(); + // Spawn a concurrent task for resolving every frame - we're careful elsewhere to + // ensure this kind of concurrency is fine, although this "throw it at the wall" + // data flow structure is pretty questionable. Once we switch to handling more than + // 1 event at a time, we should re-group frames into associated groups and then + // process those groups in-order (but the individual frames in them can still be + // thrown at the wall), with some cross-group concurrency. + handles.push(tokio::spawn(async move { + context .resolver .resolve(&frame, team_id, &context.pool, &context.catalog) - .await?; - results.push((i, resolved_frame)); - } + .await + })); } - results.sort_unstable_by_key(|(i, _)| *i); + // Collect the results + for handle in handles { + // Joinhandles wrap the returned type in a Result, because if the task panics, + // tokio catches it and returns an error. If any of our tasks panicked, we want + // to propogate that panic, so we unwrap the outer Result here. + let res = handle.await.unwrap()?; + resolved_frames.push(res) + } e.stack = Some(Stacktrace::Resolved { - frames: results.into_iter().map(|(_, frame)| frame).collect(), + frames: resolved_frames, }); Ok(e) } -// This is stupidly expensive, since it round-trips the event through JSON, lol. We should change ClickhouseEvent to only do serde at the -// edges +// This is expensive, since it round-trips the event through JSON. +// We could maybe change ClickhouseEvent to only do serde at the edges pub fn add_error_to_event( event: &mut ClickHouseEvent, e: impl ToString, diff --git a/rust/cymbal/src/main.rs b/rust/cymbal/src/main.rs index 8955a976761..ef96e8e490a 100644 --- a/rust/cymbal/src/main.rs +++ b/rust/cymbal/src/main.rs @@ -78,7 +78,7 @@ async fn main() { }; metrics::counter!(EVENT_RECEIVED).increment(1); - let _processed_event = match handle_event(&context, event).await { + let _processed_event = match handle_event(context.clone(), event).await { Ok(r) => { offset.store().unwrap(); r diff --git a/rust/cymbal/src/symbol_store/caching.rs b/rust/cymbal/src/symbol_store/caching.rs index 2fd24a2e9f3..5cab851fdac 100644 --- a/rust/cymbal/src/symbol_store/caching.rs +++ b/rust/cymbal/src/symbol_store/caching.rs @@ -12,6 +12,8 @@ use crate::{ use super::{saving::Saveable, Fetcher, Parser, Provider}; +// This is a type-specific symbol provider layer, designed to +// wrap some inner provider and provide a type-safe caching layer pub struct Caching

{ inner: P, cache: Arc>, @@ -42,16 +44,30 @@ where return Ok(set); } metrics::counter!(STORE_CACHE_MISSES).increment(1); + drop(cache); + + // Do the fetch, not holding the lock across it to allow + // concurrent fetches to occur (de-duping fetches is + // up to the caller of `lookup`, since relying on the + // cache to do it means assuming the caching layer is + // the outer layer, which is not something the interface + // guarentees) let found = self.inner.fetch(team_id, r).await?; let bytes = found.byte_count(); let parsed = self.inner.parse(found).await?; + let mut cache = self.cache.lock().await; // Re-acquire the cache-wide lock to insert, dropping the ref_lock + let parsed = Arc::new(parsed); cache.insert(cache_key, parsed.clone(), bytes); Ok(parsed) } } +// This is a cache shared across multiple symbol set providers, through the `Caching` above, +// such that two totally different "layers" can share an underlying "pool" of cache space. This +// is injected into the `Caching` layer at construct time, to allow this sharing across multiple +// provider layer "stacks" within the catalog. pub struct SymbolSetCache { // We expect this cache to consist of few, but large, items. // TODO - handle cases where two CachedSymbolSets have identical keys but different types @@ -113,16 +129,14 @@ impl SymbolSetCache { // remove them in a separate pass. let mut to_remove = vec![]; while self.held_bytes > self.max_bytes && !vals.is_empty() { - // We can unwrap here because we know we're not empty from the line above + // We can unwrap here because we know we're not empty from the line above (and + // really, even the !empty check could be skipped - if held_bytes is non-zero, we + // must have at least one element in vals) let (to_remove_key, to_remove_val) = vals.pop().unwrap(); self.held_bytes -= to_remove_val.bytes; to_remove.push(to_remove_key.clone()); } - for key in to_remove { - self.cached.remove(&key); - } - metrics::gauge!(STORE_CACHED_BYTES).set(self.held_bytes as f64); } } diff --git a/rust/cymbal/src/symbol_store/concurrency.rs b/rust/cymbal/src/symbol_store/concurrency.rs new file mode 100644 index 00000000000..466bfa8fce2 --- /dev/null +++ b/rust/cymbal/src/symbol_store/concurrency.rs @@ -0,0 +1,79 @@ +use std::{ + collections::HashMap, + sync::{Arc, Weak}, +}; + +use axum::async_trait; +use tokio::sync::{Mutex, OwnedMutexGuard}; + +use crate::error::Error; + +use super::Provider; + +// Limits the number of concurrent lookups +// for a given symbol set to 1. Note this places +// no concurrency limit /across/ different symbol +// sets, and places no limit on the number of users +// using the returned symbol set concurrently. Designed +// to wrap the caching/saving layers, allowing us to +// ensure we only fetch any given symbol set from the +// outside world exactly once +pub struct AtMostOne

{ + pub inner: P, + limiters: Mutex>>>, +} + +impl

AtMostOne

{ + pub fn new(inner: P) -> Self { + Self { + inner, + limiters: Default::default(), + } + } + + // This needs to be async even though all it does is take a lock because + // the returned owned guard can be (and is) held across an await point, so + // if this was a sync mutex it'd block the executor. It so happens that the + // std library Mutex doesn't provide lock_owned anyway, so we'd have to pull + // in a new dependency if we wanted to write a sync version of this, but + // that's secondary to it actually needing to be async + pub async fn acquire(&self, key: impl ToString) -> OwnedMutexGuard<()> { + let key = key.to_string(); + let mut state = self.limiters.lock().await; + let limiter = state.entry(key).or_default(); + + if let Some(lock) = limiter.upgrade() { + // If there's already a mutex in our shared state for this particular + // source ref, drop the global lock, and wait for the underlying source + // ref to be freed up + drop(state); + lock.lock_owned().await + } else { + // If there's no mutex in our shared state for this particular source ref, + // create one, acquire it, put a Weak to it in the shared state, and return + // the owning mutex guard (and therefore the underling Arc to the new mutex) + let new = Arc::new(Mutex::new(())); + *limiter = Arc::downgrade(&new); + let acquired = new.lock_owned().await; + drop(state); + acquired + } + } +} + +#[async_trait] +impl

Provider for AtMostOne

+where + P: Provider, + P::Ref: ToString + Send, +{ + type Ref = P::Ref; + type Set = P::Set; + + async fn lookup(&self, team_id: i32, r: Self::Ref) -> Result, Error> { + let lock = self.acquire(r.to_string()).await; + let result = self.inner.lookup(team_id, r).await; + drop(lock); + result + } +} diff --git a/rust/cymbal/src/symbol_store/mod.rs b/rust/cymbal/src/symbol_store/mod.rs index 458a0c116da..618db75d232 100644 --- a/rust/cymbal/src/symbol_store/mod.rs +++ b/rust/cymbal/src/symbol_store/mod.rs @@ -8,6 +8,7 @@ use reqwest::Url; use crate::error::Error; pub mod caching; +pub mod concurrency; pub mod saving; pub mod sourcemap; diff --git a/rust/cymbal/src/symbol_store/saving.rs b/rust/cymbal/src/symbol_store/saving.rs index d231d2400ec..a9b3f0ed19b 100644 --- a/rust/cymbal/src/symbol_store/saving.rs +++ b/rust/cymbal/src/symbol_store/saving.rs @@ -147,6 +147,10 @@ where error!("Found a record with no data and no error: {:?}", record); panic!("Found a record with no data and no error"); } + // TODO - this can fail due to changes in how we serialise, or changes in + // the error type - and we should handle that by deleting the symbol record + // and re-fetching, I think (we don't need to cleanup s3 since it's a failure + // case, there is no saved data). let error = serde_json::from_str(&record.failure_reason.unwrap()) .map_err(UnhandledError::from)?; return Err(Error::ResolutionError(error));