0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-28 18:26:15 +01:00
posthog/ee/clickhouse/system_status.py
James Greenhill 492e8e25d1
Add events count to system status page (#6062)
* Add events count to system status page

* add events count for last month and month to date
2021-09-21 22:48:32 +00:00

255 lines
7.9 KiB
Python

import glob
import subprocess
import tempfile
import uuid
from os.path import abspath, basename, dirname, join
from typing import Dict, Generator, List, Tuple
import sqlparse
from clickhouse_driver import Client
from dateutil.relativedelta import relativedelta
from django.utils import timezone
from sentry_sdk.api import capture_exception
from ee.clickhouse.client import make_ch_pool, sync_execute
from ee.clickhouse.models.event import get_event_count, get_event_count_for_last_month, get_event_count_month_to_date
from posthog.settings import CLICKHOUSE_PASSWORD, CLICKHOUSE_STABLE_HOST, CLICKHOUSE_USER
SLOW_THRESHOLD_MS = 10000
SLOW_AFTER = relativedelta(hours=6)
CLICKHOUSE_FLAMEGRAPH_EXECUTABLE = abspath(join(dirname(__file__), "bin", "clickhouse-flamegraph"))
FLAMEGRAPH_PL = abspath(join(dirname(__file__), "bin", "flamegraph.pl"))
SystemStatusRow = Dict
def system_status() -> Generator[SystemStatusRow, None, None]:
alive = is_alive()
yield {"key": "clickhouse_alive", "metric": "Clickhouse database alive", "value": alive}
if not alive:
return
yield {"key": "clickhouse_event_count", "metric": "Events in ClickHouse", "value": get_event_count()}
yield {
"key": "clickhouse_event_count_last_month",
"metric": "Events recorded last month",
"value": get_event_count_for_last_month(),
}
yield {
"key": "clickhouse_event_count_month_to_date",
"metric": "Events recorded month to date",
"value": get_event_count_month_to_date(),
}
disk_status = sync_execute(
"SELECT formatReadableSize(total_space), formatReadableSize(free_space) FROM system.disks"
)
for index, (total_space, free_space) in enumerate(disk_status):
metric = "Clickhouse disk" if len(disk_status) == 1 else f"Clickhouse disk {index}"
yield {"key": f"clickhouse_disk_{index}_free_space", "metric": f"{metric} free space", "value": free_space}
yield {"key": f"clickhouse_disk_{index}_total_space", "metric": f"{metric} total space", "value": total_space}
table_sizes = sync_execute(
"""
SELECT
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows
FROM system.parts
WHERE active
GROUP BY table
ORDER BY rows DESC
"""
)
yield {
"key": "clickhouse_table_sizes",
"metric": "Clickhouse table sizes",
"value": "",
"subrows": {"columns": ["Table", "Size", "Rows"], "rows": table_sizes},
}
system_metrics = sync_execute("SELECT * FROM system.asynchronous_metrics")
system_metrics += sync_execute("SELECT * FROM system.metrics")
yield {
"key": "clickhouse_system_metrics",
"metric": "Clickhouse system metrics",
"value": "",
"subrows": {"columns": ["Metric", "Value", "Description"], "rows": list(sorted(system_metrics))},
}
def is_alive() -> bool:
try:
sync_execute("SELECT 1")
return True
except:
return False
def get_clickhouse_running_queries() -> List[Dict]:
return query_with_columns(
"SELECT elapsed as duration, query, * FROM system.processes ORDER BY duration DESC",
columns_to_remove=["address", "initial_address", "elapsed"],
)
def get_clickhouse_slow_log() -> List[Dict]:
return query_with_columns(
f"""
SELECT query_duration_ms as duration, query, *
FROM system.query_log
WHERE query_duration_ms > {SLOW_THRESHOLD_MS}
AND event_time > %(after)s
AND query NOT LIKE '%%system.query_log%%'
AND query NOT LIKE '%%analyze_query:%%'
ORDER BY duration DESC
LIMIT 200
""",
{"after": timezone.now() - SLOW_AFTER},
columns_to_remove=[
"address",
"initial_address",
"query_duration_ms",
"event_time",
"event_date",
"query_start_time_microseconds",
"thread_ids",
"ProfileEvents.Names",
"ProfileEvents.Values",
"Settings.Names",
"Settings.Values",
],
)
def query_with_columns(query, args=None, columns_to_remove=[]) -> List[Dict]:
metrics, types = sync_execute(query, args, with_column_types=True)
type_names = [key for key, _type in types]
rows = []
for row in metrics:
result = {}
for type_name, value in zip(type_names, row):
if isinstance(value, list):
value = ", ".join(map(str, value))
if type_name not in columns_to_remove:
result[type_name] = value
rows.append(result)
return rows
def analyze_query(query: str):
random_id = str(uuid.uuid4())
# :TRICKY: Ensure all queries run on the same host.
ch_pool = make_ch_pool(host=CLICKHOUSE_STABLE_HOST)
with ch_pool.get_client() as conn:
conn.execute(
f"""
-- analyze_query:{random_id}
{query}
""",
settings={
"allow_introspection_functions": 1,
"query_profiler_real_time_period_ns": 40000000,
"query_profiler_cpu_time_period_ns": 40000000,
"memory_profiler_step": 1048576,
"max_untracked_memory": 1048576,
"memory_profiler_sample_probability": 0.01,
"use_uncompressed_cache": 0,
"readonly": 1,
"allow_ddl": 0,
},
)
query_id, timing_info = get_query_timing_info(random_id, conn)
return {
"query": sqlparse.format(query, reindent_aligned=True),
"timing": timing_info,
"flamegraphs": get_flamegraphs(query_id),
}
def get_query_timing_info(random_id: str, conn: Client) -> Tuple[str, Dict]:
conn.execute("SYSTEM FLUSH LOGS")
results = conn.execute(
"""
SELECT
query_id,
event_time,
query_duration_ms,
read_rows,
formatReadableSize(read_bytes) as read_size,
result_rows,
formatReadableSize(result_bytes) as result_size,
formatReadableSize(memory_usage) as memory_usage
FROM system.query_log
WHERE query NOT LIKE '%%query_log%%'
AND match(query, %(expr)s)
AND type = 'QueryFinish'
LIMIT 1
""",
{"expr": f"analyze_query:{random_id}"},
)
return (
results[0][0],
dict(
zip(
[
"query_id",
"event_time",
"query_duration_ms",
"read_rows",
"read_size",
"result_rows",
"result_size",
"memory_usage",
],
results[0],
)
),
)
def get_flamegraphs(query_id: str) -> Dict:
try:
with tempfile.TemporaryDirectory() as tmpdirname:
subprocess.run(
[
CLICKHOUSE_FLAMEGRAPH_EXECUTABLE,
"--query-id",
query_id,
"--clickhouse-dsn",
f"http://{CLICKHOUSE_USER}:{CLICKHOUSE_PASSWORD}@{CLICKHOUSE_STABLE_HOST}:8123/",
"--console",
"--flamegraph-script",
FLAMEGRAPH_PL,
"--date-from",
"2021-01-01",
"--width",
"1900",
],
cwd=tmpdirname,
check=True,
)
flamegraphs = {}
for file_path in glob.glob(join(tmpdirname, "*/*/global*.svg")):
with open(file_path) as file:
flamegraphs[basename(file_path)] = file.read()
return flamegraphs
except Exception as err:
capture_exception(err)
return {}