0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 00:47:50 +01:00

chore: use posthog single-shard cluster for distributed_events_recent (#26339)

This commit is contained in:
Daesgar 2024-11-22 16:52:35 +01:00 committed by GitHub
parent 57eafda201
commit 3060b29d3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 31 additions and 3 deletions

View File

@ -712,6 +712,14 @@
</replica>
</shard>
</posthog>
<posthog_single_shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</posthog_single_shard>
</remote_servers>
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.

View File

@ -0,0 +1,12 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.event.sql import (
DISTRIBUTED_EVENTS_RECENT_TABLE_SQL,
)
from posthog.settings import CLICKHOUSE_CLUSTER
operations = [
run_sql_with_exceptions(f"DROP TABLE IF EXISTS distributed_events_recent ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(DISTRIBUTED_EVENTS_RECENT_TABLE_SQL()),
]

View File

@ -75,9 +75,12 @@ class AggregatingMergeTree(MergeTreeEngine):
class Distributed:
def __init__(self, data_table: str, sharding_key: str):
def __init__(self, data_table: str, sharding_key: str, cluster: str = settings.CLICKHOUSE_CLUSTER):
self.data_table = data_table
self.sharding_key = sharding_key
self.cluster = cluster
def __str__(self):
return f"Distributed('{settings.CLICKHOUSE_CLUSTER}', '{settings.CLICKHOUSE_DATABASE}', '{self.data_table}', {self.sharding_key})"
return (
f"Distributed('{self.cluster}', '{settings.CLICKHOUSE_DATABASE}', '{self.data_table}', {self.sharding_key})"
)

View File

@ -261,7 +261,11 @@ TTL _timestamp + INTERVAL 7 DAY
DISTRIBUTED_EVENTS_RECENT_TABLE_SQL = lambda: EVENTS_TABLE_BASE_SQL.format(
table_name="distributed_events_recent",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=EVENTS_RECENT_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
engine=Distributed(
data_table=EVENTS_RECENT_DATA_TABLE(),
sharding_key="sipHash64(distinct_id)",
cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER,
),
extra_fields=KAFKA_COLUMNS_WITH_PARTITION + INSERTED_AT_COLUMN + f", {KAFKA_TIMESTAMP_MS_COLUMN}",
materialized_columns="",
indexes="",

View File

@ -154,6 +154,7 @@ CLICKHOUSE_CA: str | None = os.getenv("CLICKHOUSE_CA", None)
CLICKHOUSE_SECURE: bool = get_from_env("CLICKHOUSE_SECURE", not TEST and not DEBUG, type_cast=str_to_bool)
CLICKHOUSE_VERIFY: bool = get_from_env("CLICKHOUSE_VERIFY", True, type_cast=str_to_bool)
CLICKHOUSE_ENABLE_STORAGE_POLICY: bool = get_from_env("CLICKHOUSE_ENABLE_STORAGE_POLICY", False, type_cast=str_to_bool)
CLICKHOUSE_SINGLE_SHARD_CLUSTER: str = os.getenv("CLICKHOUSE_SINGLE_SHARD_CLUSTER", "posthog_single_shard")
CLICKHOUSE_CONN_POOL_MIN: int = get_from_env("CLICKHOUSE_CONN_POOL_MIN", 20, type_cast=int)
CLICKHOUSE_CONN_POOL_MAX: int = get_from_env("CLICKHOUSE_CONN_POOL_MAX", 1000, type_cast=int)