From 4ff32c5997ca9de6dee8ef01eb5558d09dfa789f Mon Sep 17 00:00:00 2001 From: Daesgar Date: Wed, 20 Nov 2024 12:33:08 +0100 Subject: [PATCH] feat: create events table to store last 7 days of data (#26239) --- posthog/clickhouse/kafka_engine.py | 2 + .../migrations/0086_events_recent_table.py | 14 ++++ posthog/models/event/sql.py | 84 ++++++++++++++++++- 3 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 posthog/clickhouse/migrations/0086_events_recent_table.py diff --git a/posthog/clickhouse/kafka_engine.py b/posthog/clickhouse/kafka_engine.py index abfe456bf6a..4b30047fc7d 100644 --- a/posthog/clickhouse/kafka_engine.py +++ b/posthog/clickhouse/kafka_engine.py @@ -35,6 +35,8 @@ KAFKA_COLUMNS_WITH_PARTITION = """ , _partition UInt64 """ +KAFKA_TIMESTAMP_MS_COLUMN = "_timestamp_ms DateTime64" + def kafka_engine(topic: str, kafka_host: str | None = None, group="group1", serialization="JSONEachRow") -> str: if kafka_host is None: diff --git a/posthog/clickhouse/migrations/0086_events_recent_table.py b/posthog/clickhouse/migrations/0086_events_recent_table.py new file mode 100644 index 00000000000..e672cab01e0 --- /dev/null +++ b/posthog/clickhouse/migrations/0086_events_recent_table.py @@ -0,0 +1,14 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.models.event.sql import ( + EVENTS_RECENT_TABLE_JSON_MV_SQL, + EVENTS_RECENT_TABLE_SQL, + KAFKA_EVENTS_RECENT_TABLE_JSON_SQL, + DISTRIBUTED_EVENTS_RECENT_TABLE_SQL, +) + +operations = [ + run_sql_with_exceptions(EVENTS_RECENT_TABLE_SQL()), + run_sql_with_exceptions(KAFKA_EVENTS_RECENT_TABLE_JSON_SQL()), + run_sql_with_exceptions(EVENTS_RECENT_TABLE_JSON_MV_SQL()), + run_sql_with_exceptions(DISTRIBUTED_EVENTS_RECENT_TABLE_SQL()), +] diff --git a/posthog/models/event/sql.py b/posthog/models/event/sql.py index 89beb3739c2..67f5ac97325 100644 --- a/posthog/models/event/sql.py +++ b/posthog/models/event/sql.py @@ -4,6 +4,8 @@ from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL from posthog.clickhouse.indexes import index_by_kafka_timestamp from posthog.clickhouse.kafka_engine import ( KAFKA_COLUMNS, + KAFKA_COLUMNS_WITH_PARTITION, + KAFKA_TIMESTAMP_MS_COLUMN, STORAGE_POLICY, kafka_engine, trim_quotes_expr, @@ -18,7 +20,7 @@ from posthog.kafka_client.topics import KAFKA_EVENTS_JSON EVENTS_DATA_TABLE = lambda: "sharded_events" WRITABLE_EVENTS_DATA_TABLE = lambda: "writable_events" - +EVENTS_RECENT_DATA_TABLE = lambda: "events_recent" TRUNCATE_EVENTS_TABLE_SQL = ( lambda: f"TRUNCATE TABLE IF EXISTS {EVENTS_DATA_TABLE()} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'" ) @@ -185,6 +187,86 @@ FROM {database}.kafka_events_json ) ) + +KAFKA_EVENTS_RECENT_TABLE_JSON_SQL = lambda: ( + EVENTS_TABLE_BASE_SQL + + """ + SETTINGS kafka_skip_broken_messages = 100 +""" +).format( + table_name="kafka_events_recent_json", + cluster=settings.CLICKHOUSE_CLUSTER, + engine=kafka_engine(topic=KAFKA_EVENTS_JSON, group="group1_recent"), + extra_fields="", + materialized_columns="", + indexes="", +) + +EVENTS_RECENT_TABLE_JSON_MV_SQL = ( + lambda: """ +CREATE MATERIALIZED VIEW IF NOT EXISTS events_recent_json_mv ON CLUSTER '{cluster}' +TO {database}.{target_table} +AS SELECT +uuid, +event, +properties, +timestamp, +team_id, +distinct_id, +elements_chain, +created_at, +person_id, +person_created_at, +person_properties, +group0_properties, +group1_properties, +group2_properties, +group3_properties, +group4_properties, +group0_created_at, +group1_created_at, +group2_created_at, +group3_created_at, +group4_created_at, +person_mode, +_timestamp, +_timestamp_ms, +_offset, +_partition +FROM {database}.kafka_events_recent_json +""".format( + target_table=EVENTS_RECENT_DATA_TABLE(), + cluster=settings.CLICKHOUSE_CLUSTER, + database=settings.CLICKHOUSE_DATABASE, + ) +) + +EVENTS_RECENT_TABLE_SQL = lambda: ( + EVENTS_TABLE_BASE_SQL + + """PARTITION BY toStartOfHour(_timestamp) +ORDER BY (team_id, toStartOfHour(_timestamp), event, cityHash64(distinct_id), cityHash64(uuid)) +TTL _timestamp + INTERVAL 7 DAY +{storage_policy} +""" +).format( + table_name=EVENTS_RECENT_DATA_TABLE(), + cluster=settings.CLICKHOUSE_CLUSTER, + engine=ReplacingMergeTree(EVENTS_RECENT_DATA_TABLE(), ver="_timestamp"), + extra_fields=KAFKA_COLUMNS_WITH_PARTITION + INSERTED_AT_COLUMN + f", {KAFKA_TIMESTAMP_MS_COLUMN}", + materialized_columns="", + indexes="", + storage_policy=STORAGE_POLICY(), +) + +DISTRIBUTED_EVENTS_RECENT_TABLE_SQL = lambda: EVENTS_TABLE_BASE_SQL.format( + table_name="distributed_events_recent", + cluster=settings.CLICKHOUSE_CLUSTER, + engine=Distributed(data_table=EVENTS_RECENT_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"), + extra_fields=KAFKA_COLUMNS_WITH_PARTITION + INSERTED_AT_COLUMN + f", {KAFKA_TIMESTAMP_MS_COLUMN}", + materialized_columns="", + indexes="", +) + # Distributed engine tables are only created if CLICKHOUSE_REPLICATED # This table is responsible for writing to sharded_events based on a sharding key.