0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

more cleanup etc

This commit is contained in:
Ted Kaemming 2024-11-18 21:51:14 -08:00
parent e4b753a0cd
commit 3381ea4529

View File

@ -1,12 +1,12 @@
from __future__ import annotations
from concurrent.futures import wait
from concurrent.futures import Future, as_completed
from functools import partial
import re
from collections.abc import Iterator
from collections.abc import Iterable, Iterator
from dataclasses import dataclass, replace
from datetime import timedelta
from typing import Literal, NamedTuple, cast
from typing import Any, Literal, NamedTuple, cast
from clickhouse_driver import Client
from django.utils.timezone import now
@ -22,6 +22,11 @@ from posthog.models.utils import generate_random_short_suffix
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE, TEST
def check_all(futures: Iterable[Future[Any]]) -> None:
for future in as_completed(futures):
future.result()
@dataclass
class Column:
table: str
@ -73,7 +78,7 @@ class Index:
def drop(self, client: Client) -> None:
client.execute(
f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.name}",
f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {self.name}",
settings={"alter_sync": 2 if TEST else 1},
)
@ -186,7 +191,6 @@ def get_on_cluster_clause_for_table(table: TableWithProperties) -> str:
def _get_cluster() -> ClickhouseCluster:
# TODO: this should probably check to see if the column has been disabled first
with make_ch_pool().get_client() as client:
return ClickhouseCluster(client)
@ -203,6 +207,55 @@ def _get_table_info(table: TableWithProperties) -> tuple[str | None, str]:
return dist_table, data_table
def _create_materialized_data_columns(
table, column_name, table_column, property_name, create_minmax_index: bool, comment: str | None, client
) -> None:
# execute on hosts
client.execute(
f"""
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS
{column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)}
""",
{"property": property_name},
settings={"alter_sync": 2 if TEST else 1},
)
if create_minmax_index:
index_name = f"minmax_{column_name}"
client.execute(
f"""
ALTER TABLE {table}
ADD INDEX IF NOT EXISTS {index_name} {column_name}
TYPE minmax GRANULARITY 1
""",
settings={"alter_sync": 2 if TEST else 1},
)
if comment is not None:
client.execute(
f"ALTER TABLE {table} COMMENT COLUMN {column_name} %(comment)s",
{"comment": comment},
settings={"alter_sync": 2 if TEST else 1},
)
def _create_materialized_dist_columns(table, column_name, comment: str, client) -> None:
client.execute(
f"""
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS
{column_name} VARCHAR
""",
settings={"alter_sync": 2 if TEST else 1},
)
client.execute(
f"ALTER TABLE {table} COMMENT COLUMN {column_name} %(comment)s",
{"comment": comment},
settings={"alter_sync": 2 if TEST else 1},
)
def materialize(
table: TableWithProperties,
property: PropertyName,
@ -221,51 +274,28 @@ def materialize(
column_name = column_name or _materialized_column_name(table, property, table_column)
cluster = _get_cluster()
dist_table, data_table = _get_table_info(table)
# execute on hosts
sync_execute(
f"""
ALTER TABLE {data_table}
ADD COLUMN IF NOT EXISTS
{column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)}
""",
{"property": property},
settings={"alter_sync": 2 if TEST else 1},
comment = MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment()
check_all(
cluster.map_hosts(
partial(
_create_materialized_data_columns,
data_table,
column_name,
table_column,
property,
create_minmax_index,
comment if dist_table is not None else None,
)
).values()
)
if create_minmax_index:
index_name = f"minmax_{column_name}"
sync_execute(
f"""
ALTER TABLE {data_table}
ADD INDEX IF NOT EXISTS {index_name} {column_name}
TYPE minmax GRANULARITY 1
""",
settings={"alter_sync": 2 if TEST else 1},
)
if dist_table is None:
sync_execute(
f"ALTER TABLE {data_table} COMMENT COLUMN {column_name} %(comment)s",
{"comment": MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment()},
settings={"alter_sync": 2 if TEST else 1},
)
# execute on shards
if dist_table is not None:
sync_execute(
f"""
ALTER TABLE {dist_table}
ADD COLUMN IF NOT EXISTS
{column_name} VARCHAR
""",
settings={"alter_sync": 2 if TEST else 1},
)
sync_execute(
f"ALTER TABLE {dist_table} COMMENT COLUMN {column_name} %(comment)s",
{"comment": MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment()},
settings={"alter_sync": 2 if TEST else 1},
check_all(
cluster.map_shards(partial(_create_materialized_dist_columns, dist_table, column_name, comment)).values()
)
return column_name
@ -293,7 +323,7 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name:
else:
results = cluster.map_shards(partial(_set_column_comment, data_table, column_name, comment)).values()
wait(results)
check_all(results)
def _drop_all_data(table, column_name, client):
@ -308,9 +338,9 @@ def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None:
# TODO: this should probably check to see if the column has been disabled first
if dist_table is not None:
wait(cluster.map_hosts(Column(dist_table, column_name).drop_if_exists).values())
check_all(cluster.map_hosts(Column(dist_table, column_name).drop_if_exists).values())
wait(cluster.map_shards(partial(_drop_all_data, data_table, column_name)).values())
check_all(cluster.map_shards(partial(_drop_all_data, data_table, column_name)).values())
def backfill_materialized_columns(