0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

chore: switch insight cache state to prom (#22718)

This commit is contained in:
Paul D'Ambra 2024-06-05 19:13:29 +01:00 committed by GitHub
parent f592bd5340
commit aaeae7fcd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,5 +1,4 @@
from datetime import datetime, timedelta
from time import perf_counter
from typing import Any, Optional
from uuid import UUID
@ -8,9 +7,8 @@ from django.conf import settings
from django.core.cache import cache
from django.db import connection
from django.utils.timezone import now
from prometheus_client import Counter
from prometheus_client import Counter, Gauge, Histogram
from sentry_sdk.api import capture_exception
from statshog.defaults.django import statsd
from posthog.api.services.query import process_query_dict
from posthog.clickhouse.query_tagging import tag_queries
@ -25,7 +23,26 @@ logger = structlog.get_logger(__name__)
REQUEUE_DELAY = timedelta(hours=2)
MAX_ATTEMPTS = 3
insight_cache_write_counter = Counter("posthog_cloud_insight_cache_write", "A write to the redis insight cache")
INSIGHT_CACHE_WRITE_COUNTER = Counter("posthog_cloud_insight_cache_write", "A write to the redis insight cache")
CACHE_UPDATE_SKIPPED_COUNTER = Counter(
"insight_cache_state_update_skipped", "Insight caching state is within target cache age and was not refreshed"
)
CACHE_UPDATE_SUCCEEDED_COUNTER = Counter(
"insight_cache_state_update_succeeded", "Insight cache was successfully refreshed", labelnames=["is_dashboard"]
)
CACHE_UPDATE_FAILED_COUNTER = Counter(
"insight_cache_state_update_failed", "Insight cache refresh failed", labelnames=["is_dashboard"]
)
CACHE_UPDATE_SHARED_GAUGE = Gauge(
"insight_cache_state_update_rows_updated",
"Number of rows updated during insight cache refresh. A single cache key can be shared by more than one insight/tile.",
)
CACHE_UPDATE_TIMING = Histogram(
"insight_cache_state_update_timing",
"Time spent updating the cache",
buckets=[0.1, 0.5, 1, 1.5, 2, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 120, 240],
)
def schedule_cache_updates():
@ -89,11 +106,10 @@ def update_cache(caching_state_id: UUID):
caching_state.last_refresh is not None
and now() - caching_state.last_refresh < timedelta(seconds=caching_state.target_cache_age_seconds)
):
statsd.incr("caching_state_update_skipped")
CACHE_UPDATE_SKIPPED_COUNTER.inc()
return
insight, dashboard = _extract_insight_dashboard(caching_state)
start_time = perf_counter()
exception: Optional[Exception] = None
cache_key: Optional[str] = None
@ -126,42 +142,39 @@ def update_cache(caching_state_id: UUID):
capture_exception(err, metadata)
exception = err
duration = perf_counter() - start_time
if exception is None:
assert cache_key is not None
timestamp = now()
rows_updated = update_cached_state(
caching_state.team_id,
cache_key,
timestamp,
{"result": result, "type": cache_type, "last_refresh": timestamp} if result is not None else None,
)
statsd.incr("caching_state_update_success")
statsd.incr("caching_state_update_rows_updated", rows_updated)
statsd.timing("caching_state_update_success_timing", duration)
logger.warn(
"Re-calculated insight cache",
rows_updated=rows_updated,
duration=duration,
**metadata,
)
else:
logger.warn(
"Failed to re-calculate insight cache",
exception=exception,
duration=duration,
**metadata,
refresh_attempt=caching_state.refresh_attempt,
)
statsd.incr("caching_state_update_errors")
with CACHE_UPDATE_TIMING.time():
if exception is None:
assert cache_key is not None
timestamp = now()
rows_updated = update_cached_state(
caching_state.team_id,
cache_key,
timestamp,
{"result": result, "type": cache_type, "last_refresh": timestamp} if result is not None else None,
)
CACHE_UPDATE_SUCCEEDED_COUNTER.labels(is_dashboard=dashboard is not None).inc()
CACHE_UPDATE_SHARED_GAUGE.inc(rows_updated)
logger.warn(
"Re-calculated insight cache",
rows_updated=rows_updated,
**metadata,
)
else:
logger.warn(
"Failed to re-calculate insight cache",
exception=exception,
**metadata,
refresh_attempt=caching_state.refresh_attempt,
)
CACHE_UPDATE_FAILED_COUNTER.labels(is_dashboard=dashboard is not None).inc()
if caching_state.refresh_attempt < MAX_ATTEMPTS:
update_cache_task.apply_async(args=[caching_state_id], countdown=timedelta(minutes=10).total_seconds())
if caching_state.refresh_attempt < MAX_ATTEMPTS:
update_cache_task.apply_async(args=[caching_state_id], countdown=timedelta(minutes=10).total_seconds())
InsightCachingState.objects.filter(pk=caching_state.pk).update(
refresh_attempt=caching_state.refresh_attempt + 1,
last_refresh_queued_at=now(),
)
InsightCachingState.objects.filter(pk=caching_state.pk).update(
refresh_attempt=caching_state.refresh_attempt + 1,
last_refresh_queued_at=now(),
)
def update_cached_state(
@ -173,7 +186,7 @@ def update_cached_state(
):
if result is not None: # This is particularly the case for HogQL-based queries, which cache.set() on their own
cache.set(cache_key, result, ttl if ttl is not None else settings.CACHED_RESULTS_TTL)
insight_cache_write_counter.inc()
INSIGHT_CACHE_WRITE_COUNTER.inc()
# :TRICKY: We update _all_ states with same cache_key to avoid needless re-calculations and
# handle race conditions around cache_key changing.