diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 317a100cd9b..6744f118f41 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -1,12 +1,12 @@ from __future__ import annotations -from concurrent.futures import wait +from concurrent.futures import Future, as_completed from functools import partial import re -from collections.abc import Iterator +from collections.abc import Iterable, Iterator from dataclasses import dataclass, replace from datetime import timedelta -from typing import Literal, NamedTuple, cast +from typing import Any, Literal, NamedTuple, cast from clickhouse_driver import Client from django.utils.timezone import now @@ -22,6 +22,11 @@ from posthog.models.utils import generate_random_short_suffix from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE, TEST +def check_all(futures: Iterable[Future[Any]]) -> None: + for future in as_completed(futures): + future.result() + + @dataclass class Column: table: str @@ -73,7 +78,7 @@ class Index: def drop(self, client: Client) -> None: client.execute( - f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.name}", + f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {self.name}", settings={"alter_sync": 2 if TEST else 1}, ) @@ -186,7 +191,6 @@ def get_on_cluster_clause_for_table(table: TableWithProperties) -> str: def _get_cluster() -> ClickhouseCluster: - # TODO: this should probably check to see if the column has been disabled first with make_ch_pool().get_client() as client: return ClickhouseCluster(client) @@ -203,6 +207,55 @@ def _get_table_info(table: TableWithProperties) -> tuple[str | None, str]: return dist_table, data_table +def _create_materialized_data_columns( + table, column_name, table_column, property_name, create_minmax_index: bool, comment: str | None, client +) -> None: + # execute on hosts + client.execute( + f""" + ALTER TABLE {table} + ADD COLUMN IF NOT EXISTS + {column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} + """, + {"property": property_name}, + settings={"alter_sync": 2 if TEST else 1}, + ) + + if create_minmax_index: + index_name = f"minmax_{column_name}" + client.execute( + f""" + ALTER TABLE {table} + ADD INDEX IF NOT EXISTS {index_name} {column_name} + TYPE minmax GRANULARITY 1 + """, + settings={"alter_sync": 2 if TEST else 1}, + ) + + if comment is not None: + client.execute( + f"ALTER TABLE {table} COMMENT COLUMN {column_name} %(comment)s", + {"comment": comment}, + settings={"alter_sync": 2 if TEST else 1}, + ) + + +def _create_materialized_dist_columns(table, column_name, comment: str, client) -> None: + client.execute( + f""" + ALTER TABLE {table} + ADD COLUMN IF NOT EXISTS + {column_name} VARCHAR + """, + settings={"alter_sync": 2 if TEST else 1}, + ) + client.execute( + f"ALTER TABLE {table} COMMENT COLUMN {column_name} %(comment)s", + {"comment": comment}, + settings={"alter_sync": 2 if TEST else 1}, + ) + + def materialize( table: TableWithProperties, property: PropertyName, @@ -221,51 +274,28 @@ def materialize( column_name = column_name or _materialized_column_name(table, property, table_column) + cluster = _get_cluster() dist_table, data_table = _get_table_info(table) - # execute on hosts - sync_execute( - f""" - ALTER TABLE {data_table} - ADD COLUMN IF NOT EXISTS - {column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} - """, - {"property": property}, - settings={"alter_sync": 2 if TEST else 1}, + comment = MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment() + + check_all( + cluster.map_hosts( + partial( + _create_materialized_data_columns, + data_table, + column_name, + table_column, + property, + create_minmax_index, + comment if dist_table is not None else None, + ) + ).values() ) - if create_minmax_index: - index_name = f"minmax_{column_name}" - sync_execute( - f""" - ALTER TABLE {data_table} - ADD INDEX IF NOT EXISTS {index_name} {column_name} - TYPE minmax GRANULARITY 1 - """, - settings={"alter_sync": 2 if TEST else 1}, - ) - - if dist_table is None: - sync_execute( - f"ALTER TABLE {data_table} COMMENT COLUMN {column_name} %(comment)s", - {"comment": MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment()}, - settings={"alter_sync": 2 if TEST else 1}, - ) - - # execute on shards if dist_table is not None: - sync_execute( - f""" - ALTER TABLE {dist_table} - ADD COLUMN IF NOT EXISTS - {column_name} VARCHAR - """, - settings={"alter_sync": 2 if TEST else 1}, - ) - sync_execute( - f"ALTER TABLE {dist_table} COMMENT COLUMN {column_name} %(comment)s", - {"comment": MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment()}, - settings={"alter_sync": 2 if TEST else 1}, + check_all( + cluster.map_shards(partial(_create_materialized_dist_columns, dist_table, column_name, comment)).values() ) return column_name @@ -293,7 +323,7 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: else: results = cluster.map_shards(partial(_set_column_comment, data_table, column_name, comment)).values() - wait(results) + check_all(results) def _drop_all_data(table, column_name, client): @@ -308,9 +338,9 @@ def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: # TODO: this should probably check to see if the column has been disabled first if dist_table is not None: - wait(cluster.map_hosts(Column(dist_table, column_name).drop_if_exists).values()) + check_all(cluster.map_hosts(Column(dist_table, column_name).drop_if_exists).values()) - wait(cluster.map_shards(partial(_drop_all_data, data_table, column_name)).values()) + check_all(cluster.map_shards(partial(_drop_all_data, data_table, column_name)).values()) def backfill_materialized_columns(