0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 09:14:46 +01:00
posthog/ee/tasks/materialized_columns.py
Eric Duong 46faa8fab2
refactor(FOSS): foss remove all ee dependencies from /posthog (#10319)
* refactor: foss move properties to /posthog

* refactor: move replication and materializtaion

* refactor: move file

* refactor: move test and journeys

* refactor: move breakdown props

* refactor: move query imports

* refactor: move more ee dependencies

* refactor: restore groupsjoinquery

* fix: errors

* refactor: no ee.clickhouse dependncies

* try import tasks

* refactor: move materialization

* refactor: change foss split for column optimizer

* run black

* fix: imports

* remove comment

* Update snapshots

* run black

* skip isort

* Update snapshots

* format

* more fixes

* refactor(FOSS): split out paths query code (#10378)

* refactor: move migrations

* refactor: move idl

* fix: move more imports

* import adjustments

* fix: test import

* fix: test import

* fix: remove restriction

* refactor: split out paths query code

* refactor: more code splitting

* fix: types

* refactor(FOSS): Setup such that FOSS is deployable (#10352)

* refactor: move migrations

* refactor: move idl

* fix: move more imports

* import adjustments

* fix: test import

* fix: test import

* fix: remove restriction

* fix import

* refactor: add snapshost

* subscription-import

* fix: safe import

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
2022-06-22 17:24:03 -04:00

53 lines
2.2 KiB
Python

from celery.utils.log import get_task_logger
from ee.clickhouse.materialized_columns.columns import TRIM_AND_EXTRACT_PROPERTY, ColumnName, get_materialized_columns
from posthog.clickhouse.replication.utils import clickhouse_is_replicated
from posthog.client import sync_execute
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE
logger = get_task_logger(__name__)
def mark_all_materialized() -> None:
if any_ongoing_mutations():
logger.info("There are running mutations, skipping marking as materialized")
return
for table, property_name, column_name in get_materialized_columns_with_default_expression():
updated_table = "sharded_events" if clickhouse_is_replicated() and table == "events" else table
# :TRICKY: On cloud, we ON CLUSTER updates to events/sharded_events but not to persons. Why? ¯\_(ツ)_/¯
execute_on_cluster = f"ON CLUSTER '{CLICKHOUSE_CLUSTER}'" if table == "events" else ""
sync_execute(
f"""
ALTER TABLE {updated_table}
{execute_on_cluster}
MODIFY COLUMN
{column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY}
""",
{"property": property_name},
)
def get_materialized_columns_with_default_expression():
for table in ["events", "person"]:
materialized_columns = get_materialized_columns(table, use_cache=False)
for property_name, column_name in materialized_columns.items():
if is_default_expression(table, column_name):
yield table, property_name, column_name
def any_ongoing_mutations() -> bool:
running_mutations_count = sync_execute("SELECT count(*) FROM system.mutations WHERE is_done = 0")[0][0]
return running_mutations_count > 0
def is_default_expression(table: str, column_name: ColumnName) -> bool:
updated_table = "sharded_events" if clickhouse_is_replicated and table == "events" else table
column_query = sync_execute(
"SELECT default_kind FROM system.columns WHERE table = %(table)s AND name = %(name)s AND database = %(database)s",
{"table": updated_table, "name": column_name, "database": CLICKHOUSE_DATABASE,},
)
return len(column_query) > 0 and column_query[0][0] == "DEFAULT"