diff --git a/ee/benchmarks/benchmarks.py b/ee/benchmarks/benchmarks.py index 45c6aeaa85b..1784cae10a4 100644 --- a/ee/benchmarks/benchmarks.py +++ b/ee/benchmarks/benchmarks.py @@ -28,13 +28,17 @@ from posthog.models.filters.filter import Filter from posthog.models.property import PropertyName, TableWithProperties from posthog.constants import FunnelCorrelationType -MATERIALIZED_PROPERTIES: list[tuple[TableWithProperties, PropertyName]] = [ - ("events", "$host"), - ("events", "$current_url"), - ("events", "$event_type"), - ("person", "email"), - ("person", "$browser"), -] +MATERIALIZED_PROPERTIES: dict[TableWithProperties, list[PropertyName]] = { + "events": [ + "$current_url", + "$event_type", + "$host", + ], + "person": [ + "$browser", + "email", + ], +} DATE_RANGE = {"date_from": "2021-01-01", "date_to": "2021-10-01", "interval": "week"} SHORT_DATE_RANGE = { @@ -766,14 +770,16 @@ class QuerySuite: get_person_property_values_for_key("$browser", self.team) def setup(self): - for table, property in MATERIALIZED_PROPERTIES: - if (property, "properties") not in get_materialized_columns(table): - materialize(table, property) - backfill_materialized_columns( - table, - [(property, "properties")], - backfill_period=timedelta(days=1_000), - ) + for table, properties in MATERIALIZED_PROPERTIES.items(): + existing_materialized_columns = get_materialized_columns(table) + for property in properties: + if (property, "properties") not in existing_materialized_columns: + materialize(table, property) + backfill_materialized_columns( + table, + [(property, "properties")], + backfill_period=timedelta(days=1_000), + ) # :TRICKY: Data in benchmark servers has ID=2 team = Team.objects.filter(id=2).first() diff --git a/ee/benchmarks/helpers.py b/ee/benchmarks/helpers.py index 1a22550acee..285a1dc97ee 100644 --- a/ee/benchmarks/helpers.py +++ b/ee/benchmarks/helpers.py @@ -14,7 +14,7 @@ import django # noqa: E402 django.setup() -from ee.clickhouse.materialized_columns.columns import get_materialized_columns # noqa: E402 +from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns # noqa: E402 from posthog import client # noqa: E402 from posthog.clickhouse.query_tagging import reset_query_tags, tag_queries # noqa: E402 from posthog.models.utils import UUIDT # noqa: E402 @@ -71,9 +71,9 @@ def benchmark_clickhouse(fn): @contextmanager def no_materialized_columns(): "Allows running a function without any materialized columns being used in query" - get_materialized_columns._cache = { + get_enabled_materialized_columns._cache = { ("events",): (now(), {}), ("person",): (now(), {}), } yield - get_materialized_columns._cache = {} + get_enabled_materialized_columns._cache = {} diff --git a/ee/clickhouse/materialized_columns/analyze.py b/ee/clickhouse/materialized_columns/analyze.py index d19a282117e..43a1e832569 100644 --- a/ee/clickhouse/materialized_columns/analyze.py +++ b/ee/clickhouse/materialized_columns/analyze.py @@ -1,3 +1,4 @@ +from collections import defaultdict import re from datetime import timedelta from typing import Optional @@ -177,11 +178,17 @@ def materialize_properties_task( if columns_to_materialize is None: columns_to_materialize = _analyze(time_to_analyze_hours, min_query_time, team_id_to_analyze) - result = [] - for suggestion in columns_to_materialize: - table, table_column, property_name = suggestion - if (property_name, table_column) not in get_materialized_columns(table): - result.append(suggestion) + + columns_by_table: dict[TableWithProperties, list[tuple[TableColumn, PropertyName]]] = defaultdict(list) + for table, table_column, property_name in columns_to_materialize: + columns_by_table[table].append((table_column, property_name)) + + result: list[Suggestion] = [] + for table, columns in columns_by_table.items(): + existing_materialized_columns = get_materialized_columns(table) + for table_column, property_name in columns: + if (property_name, table_column) not in existing_materialized_columns: + result.append((table, table_column, property_name)) if len(result) > 0: logger.info(f"Calculated columns that could be materialized. count={len(result)}") diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index f99f3a48f09..30814859712 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -1,11 +1,14 @@ +from __future__ import annotations + import re +from collections.abc import Iterator +from dataclasses import dataclass, replace from datetime import timedelta -from typing import Literal, cast +from typing import Literal, NamedTuple, cast from clickhouse_driver.errors import ServerException from django.utils.timezone import now -from posthog.cache_utils import cache_for from posthog.clickhouse.kafka_engine import trim_quotes_expr from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns from posthog.client import sync_execute @@ -30,37 +33,103 @@ SHORT_TABLE_COLUMN_NAME = { } -@cache_for(timedelta(minutes=15)) +class MaterializedColumn(NamedTuple): + name: ColumnName + details: MaterializedColumnDetails + + @staticmethod + def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn]: + rows = sync_execute( + """ + SELECT name, comment + FROM system.columns + WHERE database = %(database)s + AND table = %(table)s + AND comment LIKE '%%column_materializer::%%' + AND comment not LIKE '%%column_materializer::elements_chain::%%' + """, + {"database": CLICKHOUSE_DATABASE, "table": table}, + ) + + for name, comment in rows: + yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment)) + + @staticmethod + def get(table: TablesWithMaterializedColumns, column_name: ColumnName) -> MaterializedColumn: + # TODO: It would be more efficient to push the filter here down into the `get_all` query, but that would require + # more a sophisticated method of constructing queries than we have right now, and this data set should be small + # enough that this doesn't really matter (at least as of writing.) + columns = [column for column in MaterializedColumn.get_all(table) if column.name == column_name] + match columns: + case []: + raise ValueError("column does not exist") + case [column]: + return column + case _: + # this should never happen (column names are unique within a table) and suggests an error in the query + raise ValueError(f"got {len(columns)} columns, expected 0 or 1") + + +@dataclass(frozen=True) +class MaterializedColumnDetails: + table_column: TableColumn + property_name: PropertyName + is_disabled: bool + + COMMENT_PREFIX = "column_materializer" + COMMENT_SEPARATOR = "::" + COMMENT_DISABLED_MARKER = "disabled" + + def as_column_comment(self) -> str: + bits = [self.COMMENT_PREFIX, self.table_column, self.property_name] + if self.is_disabled: + bits.append(self.COMMENT_DISABLED_MARKER) + return self.COMMENT_SEPARATOR.join(bits) + + @classmethod + def from_column_comment(cls, comment: str) -> MaterializedColumnDetails: + match comment.split(cls.COMMENT_SEPARATOR, 3): + # Old style comments have the format "column_materializer::property", dealing with the default table column. + case [cls.COMMENT_PREFIX, property_name]: + return MaterializedColumnDetails(DEFAULT_TABLE_COLUMN, property_name, is_disabled=False) + # Otherwise, it's "column_materializer::table_column::property" for columns that are active. + case [cls.COMMENT_PREFIX, table_column, property_name]: + return MaterializedColumnDetails(cast(TableColumn, table_column), property_name, is_disabled=False) + # Columns that are marked as disabled have an extra trailer indicating their status. + case [cls.COMMENT_PREFIX, table_column, property_name, cls.COMMENT_DISABLED_MARKER]: + return MaterializedColumnDetails(cast(TableColumn, table_column), property_name, is_disabled=True) + case _: + raise ValueError(f"unexpected comment format: {comment!r}") + + def get_materialized_columns( table: TablesWithMaterializedColumns, + exclude_disabled_columns: bool = False, ) -> dict[tuple[PropertyName, TableColumn], ColumnName]: - rows = sync_execute( - """ - SELECT comment, name - FROM system.columns - WHERE database = %(database)s - AND table = %(table)s - AND comment LIKE '%%column_materializer::%%' - AND comment not LIKE '%%column_materializer::elements_chain::%%' - """, - {"database": CLICKHOUSE_DATABASE, "table": table}, - ) - if rows and get_instance_setting("MATERIALIZED_COLUMNS_ENABLED"): - return {_extract_property(comment): column_name for comment, column_name in rows} - else: + if not get_instance_setting("MATERIALIZED_COLUMNS_ENABLED"): return {} + return { + (column.details.property_name, column.details.table_column): column.name + for column in MaterializedColumn.get_all(table) + if not (exclude_disabled_columns and column.details.is_disabled) + } + + +def get_on_cluster_clause_for_table(table: TableWithProperties) -> str: + return f"ON CLUSTER '{CLICKHOUSE_CLUSTER}'" if table == "events" else "" + def materialize( table: TableWithProperties, property: PropertyName, - column_name=None, + column_name: ColumnName | None = None, table_column: TableColumn = DEFAULT_TABLE_COLUMN, create_minmax_index=not TEST, -) -> None: - if (property, table_column) in get_materialized_columns(table, use_cache=False): +) -> ColumnName | None: + if (property, table_column) in get_materialized_columns(table): if TEST: - return + return None raise ValueError(f"Property already materialized. table={table}, property={property}, column={table_column}") @@ -68,14 +137,12 @@ def materialize( raise ValueError(f"Invalid table_column={table_column} for materialisation") column_name = column_name or _materialized_column_name(table, property, table_column) - # :TRICKY: On cloud, we ON CLUSTER updates to events/sharded_events but not to persons. Why? ¯\_(ツ)_/¯ - execute_on_cluster = f"ON CLUSTER '{CLICKHOUSE_CLUSTER}'" if table == "events" else "" + on_cluster = get_on_cluster_clause_for_table(table) if table == "events": sync_execute( f""" - ALTER TABLE sharded_{table} - {execute_on_cluster} + ALTER TABLE sharded_{table} {on_cluster} ADD COLUMN IF NOT EXISTS {column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} """, @@ -84,8 +151,7 @@ def materialize( ) sync_execute( f""" - ALTER TABLE {table} - {execute_on_cluster} + ALTER TABLE {table} {on_cluster} ADD COLUMN IF NOT EXISTS {column_name} VARCHAR """, @@ -94,8 +160,7 @@ def materialize( else: sync_execute( f""" - ALTER TABLE {table} - {execute_on_cluster} + ALTER TABLE {table} {on_cluster} ADD COLUMN IF NOT EXISTS {column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} """, @@ -104,27 +169,58 @@ def materialize( ) sync_execute( - f"ALTER TABLE {table} {execute_on_cluster} COMMENT COLUMN {column_name} %(comment)s", - {"comment": f"column_materializer::{table_column}::{property}"}, + f"ALTER TABLE {table} {on_cluster} COMMENT COLUMN {column_name} %(comment)s", + {"comment": MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment()}, settings={"alter_sync": 2 if TEST else 1}, ) if create_minmax_index: add_minmax_index(table, column_name) + return column_name -def add_minmax_index(table: TablesWithMaterializedColumns, column_name: str): + +def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: str, is_disabled: bool) -> None: + details = replace( + MaterializedColumn.get(table, column_name).details, + is_disabled=is_disabled, + ) + + on_cluster = get_on_cluster_clause_for_table(table) + sync_execute( + f"ALTER TABLE {table} {on_cluster} COMMENT COLUMN {column_name} %(comment)s", + {"comment": details.as_column_comment()}, + settings={"alter_sync": 2 if TEST else 1}, + ) + + +def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: + drop_minmax_index(table, column_name) + + on_cluster = get_on_cluster_clause_for_table(table) + sync_execute( + f"ALTER TABLE {table} {on_cluster} DROP COLUMN IF EXISTS {column_name}", + settings={"alter_sync": 2 if TEST else 1}, + ) + + if table == "events": + sync_execute( + f"ALTER TABLE sharded_{table} {on_cluster} DROP COLUMN IF EXISTS {column_name}", + {"property": property}, + settings={"alter_sync": 2 if TEST else 1}, + ) + + +def add_minmax_index(table: TablesWithMaterializedColumns, column_name: ColumnName): # Note: This will be populated on backfill - execute_on_cluster = f"ON CLUSTER '{CLICKHOUSE_CLUSTER}'" if table == "events" else "" - + on_cluster = get_on_cluster_clause_for_table(table) updated_table = "sharded_events" if table == "events" else table index_name = f"minmax_{column_name}" try: sync_execute( f""" - ALTER TABLE {updated_table} - {execute_on_cluster} + ALTER TABLE {updated_table} {on_cluster} ADD INDEX {index_name} {column_name} TYPE minmax GRANULARITY 1 """, @@ -137,6 +233,19 @@ def add_minmax_index(table: TablesWithMaterializedColumns, column_name: str): return index_name +def drop_minmax_index(table: TablesWithMaterializedColumns, column_name: ColumnName) -> None: + on_cluster = get_on_cluster_clause_for_table(table) + + # XXX: copy/pasted from `add_minmax_index` + updated_table = "sharded_events" if table == "events" else table + index_name = f"minmax_{column_name}" + + sync_execute( + f"ALTER TABLE {updated_table} {on_cluster} DROP INDEX IF EXISTS {index_name}", + settings={"alter_sync": 2 if TEST else 1}, + ) + + def backfill_materialized_columns( table: TableWithProperties, properties: list[tuple[PropertyName, TableColumn]], @@ -153,10 +262,9 @@ def backfill_materialized_columns( return updated_table = "sharded_events" if table == "events" else table - # :TRICKY: On cloud, we ON CLUSTER updates to events/sharded_events but not to persons. Why? ¯\_(ツ)_/¯ - execute_on_cluster = f"ON CLUSTER '{CLICKHOUSE_CLUSTER}'" if table == "events" else "" + on_cluster = get_on_cluster_clause_for_table(table) - materialized_columns = get_materialized_columns(table, use_cache=False) + materialized_columns = get_materialized_columns(table) # Hack from https://github.com/ClickHouse/ClickHouse/issues/19785 # Note that for this to work all inserts should list columns explicitly @@ -164,8 +272,7 @@ def backfill_materialized_columns( for property, table_column in properties: sync_execute( f""" - ALTER TABLE {updated_table} - {execute_on_cluster} + ALTER TABLE {updated_table} {on_cluster} MODIFY COLUMN {materialized_columns[(property, table_column)]} VARCHAR DEFAULT {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} """, @@ -181,8 +288,7 @@ def backfill_materialized_columns( sync_execute( f""" - ALTER TABLE {updated_table} - {execute_on_cluster} + ALTER TABLE {updated_table} {on_cluster} UPDATE {assignments} WHERE {"timestamp > %(cutoff)s" if table == "events" else "1 = 1"} """, @@ -195,7 +301,7 @@ def _materialized_column_name( table: TableWithProperties, property: PropertyName, table_column: TableColumn = DEFAULT_TABLE_COLUMN, -) -> str: +) -> ColumnName: "Returns a sanitized and unique column name to use for materialized column" prefix = "pmat_" if table == "person" else "mat_" @@ -204,21 +310,10 @@ def _materialized_column_name( prefix += f"{SHORT_TABLE_COLUMN_NAME[table_column]}_" property_str = re.sub("[^0-9a-zA-Z$]", "_", property) - existing_materialized_columns = set(get_materialized_columns(table, use_cache=False).values()) + existing_materialized_columns = set(get_materialized_columns(table).values()) suffix = "" while f"{prefix}{property_str}{suffix}" in existing_materialized_columns: suffix = "_" + generate_random_short_suffix() return f"{prefix}{property_str}{suffix}" - - -def _extract_property(comment: str) -> tuple[PropertyName, TableColumn]: - # Old style comments have the format "column_materializer::property", dealing with the default table column. - # Otherwise, it's "column_materializer::table_column::property" - split_column = comment.split("::", 2) - - if len(split_column) == 2: - return split_column[1], DEFAULT_TABLE_COLUMN - - return split_column[2], cast(TableColumn, split_column[1]) diff --git a/ee/clickhouse/materialized_columns/test/test_columns.py b/ee/clickhouse/materialized_columns/test/test_columns.py index 472b46b1e80..4cbbef0c4a4 100644 --- a/ee/clickhouse/materialized_columns/test/test_columns.py +++ b/ee/clickhouse/materialized_columns/test/test_columns.py @@ -1,18 +1,25 @@ from datetime import timedelta from time import sleep +from unittest import TestCase from unittest.mock import patch from freezegun import freeze_time from ee.clickhouse.materialized_columns.columns import ( + MaterializedColumn, + MaterializedColumnDetails, backfill_materialized_columns, + drop_column, get_materialized_columns, materialize, + update_column_is_disabled, ) +from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns from posthog.client import sync_execute from posthog.conftest import create_clickhouse_tables from posthog.constants import GROUP_TYPES_LIMIT from posthog.models.event.sql import EVENTS_DATA_TABLE +from posthog.models.property import PropertyName, TableColumn from posthog.settings import CLICKHOUSE_DATABASE from posthog.test.base import BaseTest, ClickhouseTestMixin, _create_event @@ -22,6 +29,46 @@ EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS = [f"$group_{i}" for i in range(GROUP_ ] +class TestMaterializedColumnDetails(TestCase): + def test_column_comment_formats(self): + old_format_comment = "column_materializer::foo" + old_format_details = MaterializedColumnDetails.from_column_comment(old_format_comment) + assert old_format_details == MaterializedColumnDetails( + "properties", # the default + "foo", + is_disabled=False, + ) + # old comment format is implicitly upgraded to the newer format when serializing + assert old_format_details.as_column_comment() == "column_materializer::properties::foo" + + new_format_comment = "column_materializer::person_properties::bar" + new_format_details = MaterializedColumnDetails.from_column_comment(new_format_comment) + assert new_format_details == MaterializedColumnDetails( + "person_properties", + "bar", + is_disabled=False, + ) + assert new_format_details.as_column_comment() == new_format_comment + + new_format_disabled_comment = "column_materializer::person_properties::bar::disabled" + new_format_disabled_details = MaterializedColumnDetails.from_column_comment(new_format_disabled_comment) + assert new_format_disabled_details == MaterializedColumnDetails( + "person_properties", + "bar", + is_disabled=True, + ) + assert new_format_disabled_details.as_column_comment() == new_format_disabled_comment + + with self.assertRaises(ValueError): + MaterializedColumnDetails.from_column_comment("bad-prefix::property") + + with self.assertRaises(ValueError): + MaterializedColumnDetails.from_column_comment("bad-prefix::column::property") + + with self.assertRaises(ValueError): + MaterializedColumnDetails.from_column_comment("column_materializer::column::property::enabled") + + class TestMaterializedColumns(ClickhouseTestMixin, BaseTest): def setUp(self): self.recreate_database() @@ -50,24 +97,33 @@ class TestMaterializedColumns(ClickhouseTestMixin, BaseTest): materialize("person", "$zeta", create_minmax_index=True) self.assertCountEqual( - [property_name for property_name, _ in get_materialized_columns("events", use_cache=True).keys()], + [ + property_name + for property_name, _ in get_enabled_materialized_columns("events", use_cache=True).keys() + ], ["$foo", "$bar", *EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS], ) self.assertCountEqual( - get_materialized_columns("person", use_cache=True).keys(), + get_enabled_materialized_columns("person", use_cache=True).keys(), [("$zeta", "properties")], ) materialize("events", "abc", create_minmax_index=True) self.assertCountEqual( - [property_name for property_name, _ in get_materialized_columns("events", use_cache=True).keys()], + [ + property_name + for property_name, _ in get_enabled_materialized_columns("events", use_cache=True).keys() + ], ["$foo", "$bar", *EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS], ) with freeze_time("2020-01-04T14:00:01Z"): self.assertCountEqual( - [property_name for property_name, _ in get_materialized_columns("events", use_cache=True).keys()], + [ + property_name + for property_name, _ in get_enabled_materialized_columns("events", use_cache=True).keys() + ], ["$foo", "$bar", "abc", *EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS], ) @@ -238,3 +294,45 @@ class TestMaterializedColumns(ClickhouseTestMixin, BaseTest): "column": column, }, )[0] + + def test_lifecycle(self): + table: TablesWithMaterializedColumns = "events" + property: PropertyName = "myprop" + source_column: TableColumn = "properties" + + # create the materialized column + destination_column = materialize(table, property, table_column=source_column, create_minmax_index=True) + assert destination_column is not None + + # ensure it exists everywhere + key = (property, source_column) + assert get_materialized_columns(table)[key] == destination_column + assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( + destination_column, + MaterializedColumnDetails(source_column, property, is_disabled=False), + ) + + # disable it and ensure updates apply as needed + update_column_is_disabled(table, destination_column, is_disabled=True) + assert get_materialized_columns(table)[key] == destination_column + assert key not in get_materialized_columns(table, exclude_disabled_columns=True) + assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( + destination_column, + MaterializedColumnDetails(source_column, property, is_disabled=True), + ) + + # re-enable it and ensure updates apply as needed + update_column_is_disabled(table, destination_column, is_disabled=False) + assert get_materialized_columns(table, exclude_disabled_columns=False)[key] == destination_column + assert get_materialized_columns(table, exclude_disabled_columns=True)[key] == destination_column + assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( + destination_column, + MaterializedColumnDetails(source_column, property, is_disabled=False), + ) + + # drop it and ensure updates apply as needed + drop_column(table, destination_column) + assert key not in get_materialized_columns(table, exclude_disabled_columns=False) + assert key not in get_materialized_columns(table, exclude_disabled_columns=True) + with self.assertRaises(ValueError): + MaterializedColumn.get(table, destination_column) diff --git a/ee/clickhouse/queries/funnels/funnel_correlation.py b/ee/clickhouse/queries/funnels/funnel_correlation.py index d23c459c7ef..c29aa81355e 100644 --- a/ee/clickhouse/queries/funnels/funnel_correlation.py +++ b/ee/clickhouse/queries/funnels/funnel_correlation.py @@ -13,7 +13,7 @@ from rest_framework.exceptions import ValidationError from ee.clickhouse.queries.column_optimizer import EnterpriseColumnOptimizer from ee.clickhouse.queries.groups_join_query import GroupsJoinQuery -from posthog.clickhouse.materialized_columns import get_materialized_columns +from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns from posthog.constants import ( AUTOCAPTURE_EVENT, TREND_FILTER_TYPE_ACTIONS, @@ -156,7 +156,7 @@ class FunnelCorrelation: ): # When dealing with properties, make sure funnel response comes with properties # so we don't have to join on persons/groups to get these properties again - mat_event_cols = get_materialized_columns("events") + mat_event_cols = get_enabled_materialized_columns("events") for property_name in cast(list, self._filter.correlation_property_names): if self._filter.aggregation_group_type_index is not None: diff --git a/ee/management/commands/update_materialized_column.py b/ee/management/commands/update_materialized_column.py new file mode 100644 index 00000000000..b45444eb483 --- /dev/null +++ b/ee/management/commands/update_materialized_column.py @@ -0,0 +1,29 @@ +import logging + +from typing import Any +from collections.abc import Callable +from django.core.management.base import BaseCommand, CommandParser + +from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns +from ee.clickhouse.materialized_columns.columns import update_column_is_disabled, drop_column + +logger = logging.getLogger(__name__) + +COLUMN_OPERATIONS: dict[str, Callable[[TablesWithMaterializedColumns, ColumnName], Any]] = { + "enable": lambda table, column_name: update_column_is_disabled(table, column_name, is_disabled=False), + "disable": lambda table, column_name: update_column_is_disabled(table, column_name, is_disabled=True), + "drop": drop_column, +} + + +class Command(BaseCommand): + def add_arguments(self, parser: CommandParser) -> None: + parser.add_argument("operation", choices=COLUMN_OPERATIONS.keys()) + parser.add_argument("table") + parser.add_argument("column_name") + + def handle(self, operation: str, table: TablesWithMaterializedColumns, column_name: ColumnName, **options): + logger.info("Running %r for %r.%r...", operation, table, column_name) + fn = COLUMN_OPERATIONS[operation] + fn(table, column_name) + logger.info("Success!") diff --git a/ee/tasks/materialized_columns.py b/ee/tasks/materialized_columns.py index a6d997bfb1c..d05cdddc0b0 100644 --- a/ee/tasks/materialized_columns.py +++ b/ee/tasks/materialized_columns.py @@ -6,7 +6,7 @@ from ee.clickhouse.materialized_columns.columns import ( ) from posthog.client import sync_execute from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE -from posthog.clickhouse.materialized_columns import ColumnName +from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns logger = get_task_logger(__name__) @@ -39,8 +39,9 @@ def mark_all_materialized() -> None: def get_materialized_columns_with_default_expression(): - for table in ["events", "person"]: - materialized_columns = get_materialized_columns(table, use_cache=False) + tables: list[TablesWithMaterializedColumns] = ["events", "person"] + for table in tables: + materialized_columns = get_materialized_columns(table) for (property_name, table_column), column_name in materialized_columns.items(): if is_default_expression(table, column_name): yield table, property_name, table_column, column_name diff --git a/posthog/api/test/test_person.py b/posthog/api/test/test_person.py index fd2c6e042a1..b58ed858d9e 100644 --- a/posthog/api/test/test_person.py +++ b/posthog/api/test/test_person.py @@ -873,7 +873,7 @@ class TestPerson(ClickhouseTestMixin, APIBaseTest): create_person(team_id=self.team.pk, version=0) returned_ids = [] - with self.assertNumQueries(7): + with self.assertNumQueries(8): response = self.client.get("/api/person/?limit=10").json() self.assertEqual(len(response["results"]), 9) returned_ids += [x["distinct_ids"][0] for x in response["results"]] diff --git a/posthog/clickhouse/materialized_columns.py b/posthog/clickhouse/materialized_columns.py index 638e82270f8..2ff858274ab 100644 --- a/posthog/clickhouse/materialized_columns.py +++ b/posthog/clickhouse/materialized_columns.py @@ -1,19 +1,26 @@ -from posthog.models.property import TableWithProperties +from datetime import timedelta + +from posthog.cache_utils import cache_for +from posthog.models.property import PropertyName, TableColumn, TableWithProperties from posthog.settings import EE_AVAILABLE + ColumnName = str TablesWithMaterializedColumns = TableWithProperties if EE_AVAILABLE: from ee.clickhouse.materialized_columns.columns import get_materialized_columns else: - from datetime import timedelta - from posthog.cache_utils import cache_for - from posthog.models.property import PropertyName, TableColumn - - @cache_for(timedelta(minutes=15)) def get_materialized_columns( table: TablesWithMaterializedColumns, + exclude_disabled_columns: bool = False, ) -> dict[tuple[PropertyName, TableColumn], ColumnName]: return {} + + +@cache_for(timedelta(minutes=15)) +def get_enabled_materialized_columns( + table: TablesWithMaterializedColumns, +) -> dict[tuple[PropertyName, TableColumn], ColumnName]: + return get_materialized_columns(table, exclude_disabled_columns=True) diff --git a/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py b/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py index e927636e05e..7e7847c570b 100644 --- a/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py +++ b/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py @@ -45,7 +45,7 @@ def materialize_session_and_window_id(database): properties = ["$session_id", "$window_id"] for property_name in properties: - materialized_columns = get_materialized_columns("events", use_cache=False) + materialized_columns = get_materialized_columns("events") # If the column is not materialized, materialize it if (property_name, "properties") not in materialized_columns: materialize("events", property_name, property_name) diff --git a/posthog/hogql/printer.py b/posthog/hogql/printer.py index d86bbc8f347..da81bdd32d3 100644 --- a/posthog/hogql/printer.py +++ b/posthog/hogql/printer.py @@ -6,7 +6,7 @@ from difflib import get_close_matches from typing import Literal, Optional, Union, cast from uuid import UUID -from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_materialized_columns +from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns from posthog.clickhouse.property_groups import property_groups from posthog.hogql import ast from posthog.hogql.base import AST, _T_AST @@ -1510,7 +1510,7 @@ class _Printer(Visitor): def _get_materialized_column( self, table_name: str, property_name: PropertyName, field_name: TableColumn ) -> Optional[str]: - materialized_columns = get_materialized_columns(cast(TablesWithMaterializedColumns, table_name)) + materialized_columns = get_enabled_materialized_columns(cast(TablesWithMaterializedColumns, table_name)) return materialized_columns.get((property_name, field_name), None) def _get_timezone(self) -> str: diff --git a/posthog/hogql/transforms/property_types.py b/posthog/hogql/transforms/property_types.py index 50095ff2015..da36de8d988 100644 --- a/posthog/hogql/transforms/property_types.py +++ b/posthog/hogql/transforms/property_types.py @@ -1,6 +1,6 @@ from typing import Literal, Optional, cast -from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_materialized_columns +from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns from posthog.hogql import ast from posthog.hogql.context import HogQLContext from posthog.hogql.database.models import ( @@ -273,5 +273,5 @@ class PropertySwapper(CloningVisitor): def _get_materialized_column( self, table_name: str, property_name: PropertyName, field_name: TableColumn ) -> Optional[str]: - materialized_columns = get_materialized_columns(cast(TablesWithMaterializedColumns, table_name)) + materialized_columns = get_enabled_materialized_columns(cast(TablesWithMaterializedColumns, table_name)) return materialized_columns.get((property_name, field_name), None) diff --git a/posthog/models/property/util.py b/posthog/models/property/util.py index 0a28a3f3f06..90651b6cd1e 100644 --- a/posthog/models/property/util.py +++ b/posthog/models/property/util.py @@ -16,7 +16,7 @@ from rest_framework import exceptions from posthog.clickhouse.kafka_engine import trim_quotes_expr from posthog.clickhouse.materialized_columns import ( TableWithProperties, - get_materialized_columns, + get_enabled_materialized_columns, ) from posthog.constants import PropertyOperatorType from posthog.hogql import ast @@ -711,7 +711,7 @@ def get_property_string_expr( (optional) alias of the table being queried :return: """ - materialized_columns = get_materialized_columns(table) if allow_denormalized_props else {} + materialized_columns = get_enabled_materialized_columns(table) if allow_denormalized_props else {} table_string = f"{table_alias}." if table_alias is not None and table_alias != "" else "" diff --git a/posthog/queries/column_optimizer/foss_column_optimizer.py b/posthog/queries/column_optimizer/foss_column_optimizer.py index 5553384132e..4fffbd1faa3 100644 --- a/posthog/queries/column_optimizer/foss_column_optimizer.py +++ b/posthog/queries/column_optimizer/foss_column_optimizer.py @@ -3,7 +3,7 @@ from collections import Counter as TCounter from typing import Union, cast from collections.abc import Generator -from posthog.clickhouse.materialized_columns import ColumnName, get_materialized_columns +from posthog.clickhouse.materialized_columns import ColumnName, get_enabled_materialized_columns from posthog.constants import TREND_FILTER_TYPE_ACTIONS, FunnelCorrelationType from posthog.models.action.util import ( get_action_tables_and_properties, @@ -73,7 +73,7 @@ class FOSSColumnOptimizer: ) -> set[ColumnName]: "Transforms a list of property names to what columns are needed for that query" - materialized_columns = get_materialized_columns(table) + materialized_columns = get_enabled_materialized_columns(table) return { materialized_columns.get((property_name, table_column), table_column) for property_name, _, _ in used_properties diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index a7e8930c769..f9e3982409d 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -19,7 +19,7 @@ from sentry_sdk import capture_exception from posthog import version_requirement from posthog.clickhouse.client.connection import Workload -from posthog.clickhouse.materialized_columns import get_materialized_columns +from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns from posthog.client import sync_execute from posthog.cloud_utils import get_cached_instance_license, is_cloud from posthog.constants import FlagRequestType @@ -459,7 +459,7 @@ def get_teams_with_event_count_with_groups_in_period(begin: datetime, end: datet @timed_log() @retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_all_event_metrics_in_period(begin: datetime, end: datetime) -> dict[str, list[tuple[int, int]]]: - materialized_columns = get_materialized_columns("events") + materialized_columns = get_enabled_materialized_columns("events") # Check if $lib is materialized lib_expression = materialized_columns.get(("$lib", "properties"), "JSONExtractString(properties, '$lib')")