0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 04:12:23 +01:00
posthog/ee/clickhouse/system_status.py
Yakko Majuri 5c0054c238
Dead letter queue initial metrics dashboard (#8347)
* dead letter queue metrics dahsboard

* basic dlq metrics dashboard redo (#8396)

* update single value ui

* address review

* remove useless comment
2022-02-03 14:56:18 -05:00

296 lines
9.3 KiB
Python

import glob
import subprocess
import tempfile
import uuid
from os.path import abspath, basename, dirname, join
from typing import Dict, Generator, List, Tuple
import sqlparse
from clickhouse_driver import Client
from dateutil.relativedelta import relativedelta
from django.utils import timezone
from sentry_sdk.api import capture_exception
from ee.clickhouse.client import make_ch_pool, sync_execute
from ee.clickhouse.models.event import get_event_count, get_event_count_for_last_month, get_event_count_month_to_date
from posthog.api.dead_letter_queue import get_dead_letter_queue_events_last_24h, get_dead_letter_queue_size
from posthog.settings import CLICKHOUSE_PASSWORD, CLICKHOUSE_STABLE_HOST, CLICKHOUSE_USER
SLOW_THRESHOLD_MS = 10000
SLOW_AFTER = relativedelta(hours=6)
CLICKHOUSE_FLAMEGRAPH_EXECUTABLE = abspath(join(dirname(__file__), "bin", "clickhouse-flamegraph"))
FLAMEGRAPH_PL = abspath(join(dirname(__file__), "bin", "flamegraph.pl"))
SystemStatusRow = Dict
def system_status() -> Generator[SystemStatusRow, None, None]:
alive = is_alive()
yield {"key": "clickhouse_alive", "metric": "Clickhouse database alive", "value": alive}
if not alive:
return
yield {"key": "clickhouse_event_count", "metric": "Events in ClickHouse", "value": get_event_count()}
yield {
"key": "clickhouse_event_count_last_month",
"metric": "Events recorded last month",
"value": get_event_count_for_last_month(),
}
yield {
"key": "clickhouse_event_count_month_to_date",
"metric": "Events recorded month to date",
"value": get_event_count_month_to_date(),
}
disk_status = sync_execute(
"SELECT formatReadableSize(total_space), formatReadableSize(free_space) FROM system.disks"
)
for index, (total_space, free_space) in enumerate(disk_status):
metric = "Clickhouse disk" if len(disk_status) == 1 else f"Clickhouse disk {index}"
yield {"key": f"clickhouse_disk_{index}_free_space", "metric": f"{metric} free space", "value": free_space}
yield {"key": f"clickhouse_disk_{index}_total_space", "metric": f"{metric} total space", "value": total_space}
table_sizes = sync_execute(
"""
SELECT
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows
FROM system.parts
WHERE active
GROUP BY table
ORDER BY rows DESC
"""
)
yield {
"key": "clickhouse_table_sizes",
"metric": "Clickhouse table sizes",
"value": "",
"subrows": {"columns": ["Table", "Size", "Rows"], "rows": table_sizes},
}
system_metrics = sync_execute("SELECT * FROM system.asynchronous_metrics")
system_metrics += sync_execute("SELECT * FROM system.metrics")
yield {
"key": "clickhouse_system_metrics",
"metric": "Clickhouse system metrics",
"value": "",
"subrows": {"columns": ["Metric", "Value", "Description"], "rows": list(sorted(system_metrics))},
}
last_event_ingested_timestamp = sync_execute("SELECT max(_timestamp) FROM events")[0][0]
yield {
"key": "last_event_ingested_timestamp",
"metric": "Last event ingested",
"value": last_event_ingested_timestamp,
}
dead_letter_queue_size = get_dead_letter_queue_size()
yield {
"key": "dead_letter_queue_size",
"metric": "Dead letter queue size",
"value": dead_letter_queue_size,
}
dead_letter_queue_events_last_day = get_dead_letter_queue_events_last_24h()
yield {
"key": "dead_letter_queue_events_last_day",
"metric": "Events sent to dead letter queue in the last 24h",
"value": dead_letter_queue_events_last_day,
}
total_events_ingested_last_day = sync_execute(
"SELECT count(*) as b from events WHERE _timestamp >= (NOW() - INTERVAL 1 DAY)"
)[0][0]
dead_letter_queue_ingestion_ratio = dead_letter_queue_events_last_day / max(
dead_letter_queue_events_last_day + total_events_ingested_last_day, 1
)
# if the dead letter queue has as many events today as ingestion, issue an alert
dead_letter_queue_events_high = dead_letter_queue_ingestion_ratio >= 0.2
yield {
"key": "dead_letter_queue_ratio_ok",
"metric": "Dead letter queue ratio healthy",
"value": not dead_letter_queue_events_high,
}
def is_alive() -> bool:
try:
sync_execute("SELECT 1")
return True
except:
return False
def get_clickhouse_running_queries() -> List[Dict]:
return query_with_columns(
"SELECT elapsed as duration, query, * FROM system.processes ORDER BY duration DESC",
columns_to_remove=["address", "initial_address", "elapsed"],
)
def get_clickhouse_slow_log() -> List[Dict]:
return query_with_columns(
f"""
SELECT query_duration_ms as duration, query, *
FROM system.query_log
WHERE query_duration_ms > {SLOW_THRESHOLD_MS}
AND event_time > %(after)s
AND query NOT LIKE '%%system.query_log%%'
AND query NOT LIKE '%%analyze_query:%%'
ORDER BY duration DESC
LIMIT 200
""",
{"after": timezone.now() - SLOW_AFTER},
columns_to_remove=[
"address",
"initial_address",
"query_duration_ms",
"event_time",
"event_date",
"query_start_time_microseconds",
"thread_ids",
"ProfileEvents.Names",
"ProfileEvents.Values",
"Settings.Names",
"Settings.Values",
],
)
def query_with_columns(query, args=None, columns_to_remove=[]) -> List[Dict]:
metrics, types = sync_execute(query, args, with_column_types=True)
type_names = [key for key, _type in types]
rows = []
for row in metrics:
result = {}
for type_name, value in zip(type_names, row):
if isinstance(value, list):
value = ", ".join(map(str, value))
if type_name not in columns_to_remove:
result[type_name] = value
rows.append(result)
return rows
def analyze_query(query: str):
random_id = str(uuid.uuid4())
# :TRICKY: Ensure all queries run on the same host.
ch_pool = make_ch_pool(host=CLICKHOUSE_STABLE_HOST)
with ch_pool.get_client() as conn:
conn.execute(
f"""
-- analyze_query:{random_id}
{query}
""",
settings={
"allow_introspection_functions": 1,
"query_profiler_real_time_period_ns": 40000000,
"query_profiler_cpu_time_period_ns": 40000000,
"memory_profiler_step": 1048576,
"max_untracked_memory": 1048576,
"memory_profiler_sample_probability": 0.01,
"use_uncompressed_cache": 0,
"readonly": 1,
"allow_ddl": 0,
},
)
query_id, timing_info = get_query_timing_info(random_id, conn)
return {
"query": sqlparse.format(query, reindent_aligned=True),
"timing": timing_info,
"flamegraphs": get_flamegraphs(query_id),
}
def get_query_timing_info(random_id: str, conn: Client) -> Tuple[str, Dict]:
conn.execute("SYSTEM FLUSH LOGS")
results = conn.execute(
"""
SELECT
query_id,
event_time,
query_duration_ms,
read_rows,
formatReadableSize(read_bytes) as read_size,
result_rows,
formatReadableSize(result_bytes) as result_size,
formatReadableSize(memory_usage) as memory_usage
FROM system.query_log
WHERE query NOT LIKE '%%query_log%%'
AND match(query, %(expr)s)
AND type = 'QueryFinish'
LIMIT 1
""",
{"expr": f"analyze_query:{random_id}"},
)
return (
results[0][0],
dict(
zip(
[
"query_id",
"event_time",
"query_duration_ms",
"read_rows",
"read_size",
"result_rows",
"result_size",
"memory_usage",
],
results[0],
)
),
)
def get_flamegraphs(query_id: str) -> Dict:
try:
with tempfile.TemporaryDirectory() as tmpdirname:
subprocess.run(
[
CLICKHOUSE_FLAMEGRAPH_EXECUTABLE,
"--query-id",
query_id,
"--clickhouse-dsn",
f"http://{CLICKHOUSE_USER}:{CLICKHOUSE_PASSWORD}@{CLICKHOUSE_STABLE_HOST}:8123/",
"--console",
"--flamegraph-script",
FLAMEGRAPH_PL,
"--date-from",
"2021-01-01",
"--width",
"1900",
],
cwd=tmpdirname,
check=True,
)
flamegraphs = {}
for file_path in glob.glob(join(tmpdirname, "*/*/global*.svg")):
with open(file_path) as file:
flamegraphs[basename(file_path)] = file.read()
return flamegraphs
except Exception as err:
capture_exception(err)
return {}