diff --git a/posthog/caching/insight_cache.py b/posthog/caching/insight_cache.py index 8bdefe471b9..faf78c0eb4f 100644 --- a/posthog/caching/insight_cache.py +++ b/posthog/caching/insight_cache.py @@ -1,5 +1,4 @@ from datetime import datetime, timedelta -from time import perf_counter from typing import Any, Optional from uuid import UUID @@ -8,9 +7,8 @@ from django.conf import settings from django.core.cache import cache from django.db import connection from django.utils.timezone import now -from prometheus_client import Counter +from prometheus_client import Counter, Gauge, Histogram from sentry_sdk.api import capture_exception -from statshog.defaults.django import statsd from posthog.api.services.query import process_query_dict from posthog.clickhouse.query_tagging import tag_queries @@ -25,7 +23,26 @@ logger = structlog.get_logger(__name__) REQUEUE_DELAY = timedelta(hours=2) MAX_ATTEMPTS = 3 -insight_cache_write_counter = Counter("posthog_cloud_insight_cache_write", "A write to the redis insight cache") +INSIGHT_CACHE_WRITE_COUNTER = Counter("posthog_cloud_insight_cache_write", "A write to the redis insight cache") + +CACHE_UPDATE_SKIPPED_COUNTER = Counter( + "insight_cache_state_update_skipped", "Insight caching state is within target cache age and was not refreshed" +) +CACHE_UPDATE_SUCCEEDED_COUNTER = Counter( + "insight_cache_state_update_succeeded", "Insight cache was successfully refreshed", labelnames=["is_dashboard"] +) +CACHE_UPDATE_FAILED_COUNTER = Counter( + "insight_cache_state_update_failed", "Insight cache refresh failed", labelnames=["is_dashboard"] +) +CACHE_UPDATE_SHARED_GAUGE = Gauge( + "insight_cache_state_update_rows_updated", + "Number of rows updated during insight cache refresh. A single cache key can be shared by more than one insight/tile.", +) +CACHE_UPDATE_TIMING = Histogram( + "insight_cache_state_update_timing", + "Time spent updating the cache", + buckets=[0.1, 0.5, 1, 1.5, 2, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 120, 240], +) def schedule_cache_updates(): @@ -89,11 +106,10 @@ def update_cache(caching_state_id: UUID): caching_state.last_refresh is not None and now() - caching_state.last_refresh < timedelta(seconds=caching_state.target_cache_age_seconds) ): - statsd.incr("caching_state_update_skipped") + CACHE_UPDATE_SKIPPED_COUNTER.inc() return insight, dashboard = _extract_insight_dashboard(caching_state) - start_time = perf_counter() exception: Optional[Exception] = None cache_key: Optional[str] = None @@ -126,42 +142,39 @@ def update_cache(caching_state_id: UUID): capture_exception(err, metadata) exception = err - duration = perf_counter() - start_time - if exception is None: - assert cache_key is not None - timestamp = now() - rows_updated = update_cached_state( - caching_state.team_id, - cache_key, - timestamp, - {"result": result, "type": cache_type, "last_refresh": timestamp} if result is not None else None, - ) - statsd.incr("caching_state_update_success") - statsd.incr("caching_state_update_rows_updated", rows_updated) - statsd.timing("caching_state_update_success_timing", duration) - logger.warn( - "Re-calculated insight cache", - rows_updated=rows_updated, - duration=duration, - **metadata, - ) - else: - logger.warn( - "Failed to re-calculate insight cache", - exception=exception, - duration=duration, - **metadata, - refresh_attempt=caching_state.refresh_attempt, - ) - statsd.incr("caching_state_update_errors") + with CACHE_UPDATE_TIMING.time(): + if exception is None: + assert cache_key is not None + timestamp = now() + rows_updated = update_cached_state( + caching_state.team_id, + cache_key, + timestamp, + {"result": result, "type": cache_type, "last_refresh": timestamp} if result is not None else None, + ) + CACHE_UPDATE_SUCCEEDED_COUNTER.labels(is_dashboard=dashboard is not None).inc() + CACHE_UPDATE_SHARED_GAUGE.inc(rows_updated) + logger.warn( + "Re-calculated insight cache", + rows_updated=rows_updated, + **metadata, + ) + else: + logger.warn( + "Failed to re-calculate insight cache", + exception=exception, + **metadata, + refresh_attempt=caching_state.refresh_attempt, + ) + CACHE_UPDATE_FAILED_COUNTER.labels(is_dashboard=dashboard is not None).inc() - if caching_state.refresh_attempt < MAX_ATTEMPTS: - update_cache_task.apply_async(args=[caching_state_id], countdown=timedelta(minutes=10).total_seconds()) + if caching_state.refresh_attempt < MAX_ATTEMPTS: + update_cache_task.apply_async(args=[caching_state_id], countdown=timedelta(minutes=10).total_seconds()) - InsightCachingState.objects.filter(pk=caching_state.pk).update( - refresh_attempt=caching_state.refresh_attempt + 1, - last_refresh_queued_at=now(), - ) + InsightCachingState.objects.filter(pk=caching_state.pk).update( + refresh_attempt=caching_state.refresh_attempt + 1, + last_refresh_queued_at=now(), + ) def update_cached_state( @@ -173,7 +186,7 @@ def update_cached_state( ): if result is not None: # This is particularly the case for HogQL-based queries, which cache.set() on their own cache.set(cache_key, result, ttl if ttl is not None else settings.CACHED_RESULTS_TTL) - insight_cache_write_counter.inc() + INSIGHT_CACHE_WRITE_COUNTER.inc() # :TRICKY: We update _all_ states with same cache_key to avoid needless re-calculations and # handle race conditions around cache_key changing.