mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 13:39:22 +01:00
fix(err): assorted perf/concurrency things (#26235)
This commit is contained in:
parent
ead289eaeb
commit
afeb5529ce
@ -16,6 +16,7 @@ use crate::{
|
||||
frames::resolver::Resolver,
|
||||
symbol_store::{
|
||||
caching::{Caching, SymbolSetCache},
|
||||
concurrency,
|
||||
saving::Saving,
|
||||
sourcemap::SourcemapProvider,
|
||||
Catalog, S3Client,
|
||||
@ -80,13 +81,18 @@ impl AppContext {
|
||||
config.ss_prefix.clone(),
|
||||
);
|
||||
let caching_smp = Caching::new(saving_smp, ss_cache);
|
||||
// We want to fetch each sourcemap from the outside world
|
||||
// exactly once, and if it isn't in the cache, load/parse
|
||||
// it from s3 exactly once too. Limiting the per symbol set
|
||||
// reference concurreny to 1 ensures this.
|
||||
let limited_smp = concurrency::AtMostOne::new(caching_smp);
|
||||
|
||||
info!(
|
||||
"AppContext initialized, subscribed to topic {}",
|
||||
config.consumer.kafka_consumer_topic
|
||||
);
|
||||
|
||||
let catalog = Catalog::new(caching_smp);
|
||||
let catalog = Catalog::new(limited_smp);
|
||||
let resolver = Resolver::new(config);
|
||||
|
||||
Ok(Self {
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use app_context::AppContext;
|
||||
use common_types::ClickHouseEvent;
|
||||
@ -20,7 +20,7 @@ pub mod symbol_store;
|
||||
pub mod types;
|
||||
|
||||
pub async fn handle_event(
|
||||
context: &AppContext,
|
||||
context: Arc<AppContext>,
|
||||
mut event: ClickHouseEvent,
|
||||
) -> Result<Option<ClickHouseEvent>, UnhandledError> {
|
||||
let mut props = match get_props(&event) {
|
||||
@ -45,7 +45,7 @@ pub async fn handle_event(
|
||||
// If we get an unhandled error during exception processing, we return an error, which should
|
||||
// cause the caller to drop the offset without storing it - unhandled exceptions indicate
|
||||
// a dependency is down, or some bug, adn we want to take lag in those situations.
|
||||
results.push(process_exception(context, event.team_id, exception).await?);
|
||||
results.push(process_exception(context.clone(), event.team_id, exception).await?);
|
||||
}
|
||||
|
||||
let fingerprint = generate_fingerprint(&results);
|
||||
@ -79,7 +79,7 @@ fn get_props(event: &ClickHouseEvent) -> Result<RawErrProps, EventError> {
|
||||
}
|
||||
|
||||
async fn process_exception(
|
||||
context: &AppContext,
|
||||
context: Arc<AppContext>,
|
||||
team_id: i32,
|
||||
mut e: Exception,
|
||||
) -> Result<Exception, UnhandledError> {
|
||||
@ -96,40 +96,43 @@ async fn process_exception(
|
||||
return Ok(e);
|
||||
}
|
||||
|
||||
let mut results = Vec::with_capacity(frames.len());
|
||||
let mut handles = Vec::with_capacity(frames.len());
|
||||
let mut resolved_frames = Vec::with_capacity(frames.len());
|
||||
|
||||
// Cluster the frames by symbol set
|
||||
// TODO - we really want to cluster across exceptions (and even across events),
|
||||
// rather than just within a single exception
|
||||
let mut groups = HashMap::new();
|
||||
for (i, frame) in frames.into_iter().enumerate() {
|
||||
let group = groups
|
||||
.entry(frame.symbol_set_ref())
|
||||
.or_insert_with(Vec::new);
|
||||
group.push((i, frame));
|
||||
}
|
||||
|
||||
for (_, frames) in groups.into_iter() {
|
||||
for (i, frame) in frames {
|
||||
let resolved_frame = context
|
||||
for frame in frames.into_iter() {
|
||||
let context = context.clone();
|
||||
// Spawn a concurrent task for resolving every frame - we're careful elsewhere to
|
||||
// ensure this kind of concurrency is fine, although this "throw it at the wall"
|
||||
// data flow structure is pretty questionable. Once we switch to handling more than
|
||||
// 1 event at a time, we should re-group frames into associated groups and then
|
||||
// process those groups in-order (but the individual frames in them can still be
|
||||
// thrown at the wall), with some cross-group concurrency.
|
||||
handles.push(tokio::spawn(async move {
|
||||
context
|
||||
.resolver
|
||||
.resolve(&frame, team_id, &context.pool, &context.catalog)
|
||||
.await?;
|
||||
results.push((i, resolved_frame));
|
||||
}
|
||||
.await
|
||||
}));
|
||||
}
|
||||
|
||||
results.sort_unstable_by_key(|(i, _)| *i);
|
||||
// Collect the results
|
||||
for handle in handles {
|
||||
// Joinhandles wrap the returned type in a Result, because if the task panics,
|
||||
// tokio catches it and returns an error. If any of our tasks panicked, we want
|
||||
// to propogate that panic, so we unwrap the outer Result here.
|
||||
let res = handle.await.unwrap()?;
|
||||
resolved_frames.push(res)
|
||||
}
|
||||
|
||||
e.stack = Some(Stacktrace::Resolved {
|
||||
frames: results.into_iter().map(|(_, frame)| frame).collect(),
|
||||
frames: resolved_frames,
|
||||
});
|
||||
|
||||
Ok(e)
|
||||
}
|
||||
|
||||
// This is stupidly expensive, since it round-trips the event through JSON, lol. We should change ClickhouseEvent to only do serde at the
|
||||
// edges
|
||||
// This is expensive, since it round-trips the event through JSON.
|
||||
// We could maybe change ClickhouseEvent to only do serde at the edges
|
||||
pub fn add_error_to_event(
|
||||
event: &mut ClickHouseEvent,
|
||||
e: impl ToString,
|
||||
|
@ -78,7 +78,7 @@ async fn main() {
|
||||
};
|
||||
metrics::counter!(EVENT_RECEIVED).increment(1);
|
||||
|
||||
let _processed_event = match handle_event(&context, event).await {
|
||||
let _processed_event = match handle_event(context.clone(), event).await {
|
||||
Ok(r) => {
|
||||
offset.store().unwrap();
|
||||
r
|
||||
|
@ -12,6 +12,8 @@ use crate::{
|
||||
|
||||
use super::{saving::Saveable, Fetcher, Parser, Provider};
|
||||
|
||||
// This is a type-specific symbol provider layer, designed to
|
||||
// wrap some inner provider and provide a type-safe caching layer
|
||||
pub struct Caching<P> {
|
||||
inner: P,
|
||||
cache: Arc<Mutex<SymbolSetCache>>,
|
||||
@ -42,16 +44,30 @@ where
|
||||
return Ok(set);
|
||||
}
|
||||
metrics::counter!(STORE_CACHE_MISSES).increment(1);
|
||||
drop(cache);
|
||||
|
||||
// Do the fetch, not holding the lock across it to allow
|
||||
// concurrent fetches to occur (de-duping fetches is
|
||||
// up to the caller of `lookup`, since relying on the
|
||||
// cache to do it means assuming the caching layer is
|
||||
// the outer layer, which is not something the interface
|
||||
// guarentees)
|
||||
let found = self.inner.fetch(team_id, r).await?;
|
||||
let bytes = found.byte_count();
|
||||
let parsed = self.inner.parse(found).await?;
|
||||
|
||||
let mut cache = self.cache.lock().await; // Re-acquire the cache-wide lock to insert, dropping the ref_lock
|
||||
|
||||
let parsed = Arc::new(parsed);
|
||||
cache.insert(cache_key, parsed.clone(), bytes);
|
||||
Ok(parsed)
|
||||
}
|
||||
}
|
||||
|
||||
// This is a cache shared across multiple symbol set providers, through the `Caching` above,
|
||||
// such that two totally different "layers" can share an underlying "pool" of cache space. This
|
||||
// is injected into the `Caching` layer at construct time, to allow this sharing across multiple
|
||||
// provider layer "stacks" within the catalog.
|
||||
pub struct SymbolSetCache {
|
||||
// We expect this cache to consist of few, but large, items.
|
||||
// TODO - handle cases where two CachedSymbolSets have identical keys but different types
|
||||
@ -113,16 +129,14 @@ impl SymbolSetCache {
|
||||
// remove them in a separate pass.
|
||||
let mut to_remove = vec![];
|
||||
while self.held_bytes > self.max_bytes && !vals.is_empty() {
|
||||
// We can unwrap here because we know we're not empty from the line above
|
||||
// We can unwrap here because we know we're not empty from the line above (and
|
||||
// really, even the !empty check could be skipped - if held_bytes is non-zero, we
|
||||
// must have at least one element in vals)
|
||||
let (to_remove_key, to_remove_val) = vals.pop().unwrap();
|
||||
self.held_bytes -= to_remove_val.bytes;
|
||||
to_remove.push(to_remove_key.clone());
|
||||
}
|
||||
|
||||
for key in to_remove {
|
||||
self.cached.remove(&key);
|
||||
}
|
||||
|
||||
metrics::gauge!(STORE_CACHED_BYTES).set(self.held_bytes as f64);
|
||||
}
|
||||
}
|
||||
|
79
rust/cymbal/src/symbol_store/concurrency.rs
Normal file
79
rust/cymbal/src/symbol_store/concurrency.rs
Normal file
@ -0,0 +1,79 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
|
||||
use axum::async_trait;
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
use super::Provider;
|
||||
|
||||
// Limits the number of concurrent lookups
|
||||
// for a given symbol set to 1. Note this places
|
||||
// no concurrency limit /across/ different symbol
|
||||
// sets, and places no limit on the number of users
|
||||
// using the returned symbol set concurrently. Designed
|
||||
// to wrap the caching/saving layers, allowing us to
|
||||
// ensure we only fetch any given symbol set from the
|
||||
// outside world exactly once
|
||||
pub struct AtMostOne<P> {
|
||||
pub inner: P,
|
||||
limiters: Mutex<HashMap<String, Weak<Mutex<()>>>>,
|
||||
}
|
||||
|
||||
impl<P> AtMostOne<P> {
|
||||
pub fn new(inner: P) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
limiters: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
// This needs to be async even though all it does is take a lock because
|
||||
// the returned owned guard can be (and is) held across an await point, so
|
||||
// if this was a sync mutex it'd block the executor. It so happens that the
|
||||
// std library Mutex doesn't provide lock_owned anyway, so we'd have to pull
|
||||
// in a new dependency if we wanted to write a sync version of this, but
|
||||
// that's secondary to it actually needing to be async
|
||||
pub async fn acquire(&self, key: impl ToString) -> OwnedMutexGuard<()> {
|
||||
let key = key.to_string();
|
||||
let mut state = self.limiters.lock().await;
|
||||
let limiter = state.entry(key).or_default();
|
||||
|
||||
if let Some(lock) = limiter.upgrade() {
|
||||
// If there's already a mutex in our shared state for this particular
|
||||
// source ref, drop the global lock, and wait for the underlying source
|
||||
// ref to be freed up
|
||||
drop(state);
|
||||
lock.lock_owned().await
|
||||
} else {
|
||||
// If there's no mutex in our shared state for this particular source ref,
|
||||
// create one, acquire it, put a Weak to it in the shared state, and return
|
||||
// the owning mutex guard (and therefore the underling Arc to the new mutex)
|
||||
let new = Arc::new(Mutex::new(()));
|
||||
*limiter = Arc::downgrade(&new);
|
||||
let acquired = new.lock_owned().await;
|
||||
drop(state);
|
||||
acquired
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<P> Provider for AtMostOne<P>
|
||||
where
|
||||
P: Provider,
|
||||
P::Ref: ToString + Send,
|
||||
{
|
||||
type Ref = P::Ref;
|
||||
type Set = P::Set;
|
||||
|
||||
async fn lookup(&self, team_id: i32, r: Self::Ref) -> Result<Arc<Self::Set>, Error> {
|
||||
let lock = self.acquire(r.to_string()).await;
|
||||
let result = self.inner.lookup(team_id, r).await;
|
||||
drop(lock);
|
||||
result
|
||||
}
|
||||
}
|
@ -8,6 +8,7 @@ use reqwest::Url;
|
||||
use crate::error::Error;
|
||||
|
||||
pub mod caching;
|
||||
pub mod concurrency;
|
||||
pub mod saving;
|
||||
pub mod sourcemap;
|
||||
|
||||
|
@ -147,6 +147,10 @@ where
|
||||
error!("Found a record with no data and no error: {:?}", record);
|
||||
panic!("Found a record with no data and no error");
|
||||
}
|
||||
// TODO - this can fail due to changes in how we serialise, or changes in
|
||||
// the error type - and we should handle that by deleting the symbol record
|
||||
// and re-fetching, I think (we don't need to cleanup s3 since it's a failure
|
||||
// case, there is no saved data).
|
||||
let error = serde_json::from_str(&record.failure_reason.unwrap())
|
||||
.map_err(UnhandledError::from)?;
|
||||
return Err(Error::ResolutionError(error));
|
||||
|
Loading…
Reference in New Issue
Block a user