0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 12:21:02 +01:00
posthog/ee/clickhouse/util.py
Karl-Aksel Puulmann e3bf0cb31d
Session recording on clickhouse, separate tables and retention cronjob (#2051)
* Add scheduled task to wipe session recordings

* Create a new table for session recording

* Save snapshot events to different table

* Use SessionRecordingEvent over Events everywhere

We can remove a ton of cruft this way as well

* Add missing signature

* Extract util from models/event

* Attempt to update ingest side of clickhouse session recording events

Note that it's using main kafka topic - not sure if a good idea.

* Get separate table in ch working for session recording events

* WIP: query sessions

* Make both session recording queries work

* Make linter happy

* Rebase migration

* Make tests work

* Apply a TTL to session recordings and other configuration:

- toYYYYMMDD partitioning should be smoother with TTL setup
- TTL achieves not needing to archive the data ourselves
- index_granularity will enable smaller reads per session_id
- ORDER BY clause is to make single session as well as time range query
  reasonable

* Convert retention cronjob to new model

* Add tests to process_event changes

* Add test for ee_capture change

* Fixup migration

* Make clickhouse tests drop/create session recording tables

* Make TTL not be there in tests

Otherwise writes get eaten by it during tests when mocking time

* Fix retention task

Co-authored-by: Tim Glaser <tim@glsr.nl>
2020-10-28 21:22:16 +01:00

117 lines
4.0 KiB
Python

from contextlib import contextmanager
import posthoganalytics
from clickhouse_driver.errors import ServerException
from django.conf import settings
from django.db import DEFAULT_DB_ALIAS
from ee.clickhouse.client import sync_execute
from ee.clickhouse.sql.events import (
DROP_EVENTS_TABLE_SQL,
DROP_EVENTS_WITH_ARRAY_PROPS_TABLE_SQL,
DROP_MAT_EVENTS_PROP_TABLE_SQL,
DROP_MAT_EVENTS_WITH_ARRAY_PROPS_TABLE_SQL,
EVENTS_TABLE_SQL,
EVENTS_WITH_PROPS_TABLE_SQL,
MAT_EVENT_PROP_TABLE_SQL,
MAT_EVENTS_WITH_PROPS_TABLE_SQL,
)
from ee.clickhouse.sql.person import (
DROP_MAT_PERSONS_PROP_TABLE_SQL,
DROP_MAT_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL,
DROP_PERSON_DISTINCT_ID_TABLE_SQL,
DROP_PERSON_MATERIALIZED_SQL,
DROP_PERSON_TABLE_SQL,
DROP_PERSON_VIEW_SQL,
DROP_PERSONS_PROP_UP_TO_DATE_VIEW_SQL,
DROP_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL,
MAT_PERSONS_PROP_TABLE_SQL,
MAT_PERSONS_WITH_PROPS_TABLE_SQL,
PERSONS_DISTINCT_ID_TABLE_SQL,
PERSONS_PROP_UP_TO_DATE_VIEW,
PERSONS_TABLE_SQL,
PERSONS_UP_TO_DATE_MATERIALIZED_VIEW,
PERSONS_UP_TO_DATE_VIEW,
PERSONS_WITH_PROPS_TABLE_SQL,
)
from ee.clickhouse.sql.session_recording_events import (
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL,
SESSION_RECORDING_EVENTS_TABLE_SQL,
)
class ClickhouseTestMixin:
def tearDown(self):
try:
self._destroy_event_tables()
self._destroy_person_tables()
self._destroy_session_recording_tables()
self._create_event_tables()
self._create_person_tables()
self._create_session_recording_tables()
except ServerException:
pass
def _destroy_person_tables(self):
sync_execute(DROP_PERSON_VIEW_SQL)
sync_execute(DROP_PERSON_MATERIALIZED_SQL)
sync_execute(DROP_PERSON_TABLE_SQL)
sync_execute(DROP_PERSON_DISTINCT_ID_TABLE_SQL)
sync_execute(DROP_PERSONS_PROP_UP_TO_DATE_VIEW_SQL)
sync_execute(DROP_MAT_PERSONS_PROP_TABLE_SQL)
sync_execute(DROP_MAT_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL)
sync_execute(DROP_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL)
def _create_person_tables(self):
sync_execute(PERSONS_TABLE_SQL)
sync_execute(PERSONS_DISTINCT_ID_TABLE_SQL)
sync_execute(PERSONS_UP_TO_DATE_MATERIALIZED_VIEW)
sync_execute(PERSONS_UP_TO_DATE_VIEW)
sync_execute(PERSONS_WITH_PROPS_TABLE_SQL)
sync_execute(MAT_PERSONS_WITH_PROPS_TABLE_SQL)
sync_execute(MAT_PERSONS_PROP_TABLE_SQL)
sync_execute(PERSONS_PROP_UP_TO_DATE_VIEW)
def _destroy_session_recording_tables(self):
sync_execute(DROP_SESSION_RECORDING_EVENTS_TABLE_SQL)
def _create_session_recording_tables(self):
sync_execute(SESSION_RECORDING_EVENTS_TABLE_SQL)
def _destroy_event_tables(self):
sync_execute(DROP_EVENTS_TABLE_SQL)
sync_execute(DROP_EVENTS_WITH_ARRAY_PROPS_TABLE_SQL)
sync_execute(DROP_MAT_EVENTS_WITH_ARRAY_PROPS_TABLE_SQL)
sync_execute(DROP_MAT_EVENTS_PROP_TABLE_SQL)
def _create_event_tables(self):
sync_execute(EVENTS_TABLE_SQL)
sync_execute(EVENTS_WITH_PROPS_TABLE_SQL)
sync_execute(MAT_EVENTS_WITH_PROPS_TABLE_SQL)
sync_execute(MAT_EVENT_PROP_TABLE_SQL)
@contextmanager
def _assertNumQueries(self, func):
yield
# Ignore assertNumQueries in clickhouse tests
def assertNumQueries(self, num, func=None, *args, using=DEFAULT_DB_ALIAS, **kwargs):
return self._assertNumQueries(func)
CH_PERSON_ENDPOINT = "ch-person-endpoint"
CH_EVENT_ENDPOINT = "ch-event-endpoint"
CH_ACTION_ENDPOINT = "ch-action-endpoint"
CH_TREND_ENDPOINT = "ch-trend-endpoint"
CH_SESSION_ENDPOINT = "ch-session-endpoint"
CH_PATH_ENDPOINT = "ch-path-endpoint"
CH_FUNNEL_ENDPOINT = "ch-funnel-endpoint"
CH_RETENTION_ENDPOINT = "ch-retention-endpoint"
def endpoint_enabled(endpoint_flag: str, distinct_id: str):
return posthoganalytics.feature_enabled(endpoint_flag, distinct_id) or settings.DEBUG or settings.TEST