mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 21:49:51 +01:00
feat: separate blob ingestion topic (#16145)
This commit is contained in:
parent
687190fa89
commit
5e6af4300c
@ -14,9 +14,9 @@
|
||||
<env name="KEA_VERBOSE_LOGGING" value="false" />
|
||||
<env name="PRINT_SQL" value="1" />
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
<env name="REPLAY_BLOB_INGESTION_TRAFFIC_RATIO" value="1.0" />
|
||||
<env name="SESSION_RECORDING_KAFKA_HOSTS" value="localhost" />
|
||||
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
|
||||
<env name="REPLAY_BLOB_INGESTION_TRAFFIC_RATIO" value="1" />
|
||||
</envs>
|
||||
<option name="SDK_HOME" value="$PROJECT_DIR$/env/bin/python" />
|
||||
<option name="SDK_NAME" value="Python 3.10 (posthog)" />
|
||||
|
@ -9,14 +9,10 @@ export const KAFKA_EVENTS_JSON = `${prefix}clickhouse_events_json${suffix}`
|
||||
export const KAFKA_PERSON = `${prefix}clickhouse_person${suffix}`
|
||||
export const KAFKA_PERSON_UNIQUE_ID = `${prefix}clickhouse_person_unique_id${suffix}`
|
||||
export const KAFKA_PERSON_DISTINCT_ID = `${prefix}clickhouse_person_distinct_id${suffix}`
|
||||
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
|
||||
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
|
||||
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
|
||||
|
||||
export const KAFKA_EVENTS_PLUGIN_INGESTION = `${prefix}events_plugin_ingestion${suffix}`
|
||||
export const KAFKA_EVENTS_PLUGIN_INGESTION_DLQ = `${prefix}events_plugin_ingestion_dlq${suffix}`
|
||||
export const KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW = `${prefix}events_plugin_ingestion_overflow${suffix}`
|
||||
export const KAFKA_SESSION_RECORDING_EVENTS = `${prefix}session_recording_events${suffix}`
|
||||
export const KAFKA_SESSION_RECORDING_EVENTS_DLQ = `${prefix}session_recording_events_dlq${suffix}`
|
||||
export const KAFKA_PLUGIN_LOG_ENTRIES = `${prefix}plugin_log_entries${suffix}`
|
||||
export const KAFKA_EVENTS_DEAD_LETTER_QUEUE = `${prefix}events_dead_letter_queue${suffix}`
|
||||
export const KAFKA_GROUPS = `${prefix}clickhouse_groups${suffix}`
|
||||
@ -29,3 +25,14 @@ export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}`
|
||||
export const KAFKA_SCHEDULED_TASKS_DLQ = `${prefix}scheduled_tasks_dlq${suffix}`
|
||||
export const KAFKA_METRICS_TIME_TO_SEE_DATA = `${prefix}clickhouse_metrics_time_to_see_data${suffix}`
|
||||
export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffix}`
|
||||
|
||||
// read session recording events from Kafka
|
||||
export const KAFKA_SESSION_RECORDING_EVENTS = `${prefix}session_recording_events${suffix}`
|
||||
export const KAFKA_SESSION_RECORDING_EVENTS_DLQ = `${prefix}session_recording_events_dlq${suffix}`
|
||||
// read session recording snapshot items
|
||||
export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}`
|
||||
// write session recording and replay events to ClickHouse
|
||||
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
|
||||
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
|
||||
// write performance events to ClickHouse
|
||||
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
|
||||
|
@ -6,7 +6,10 @@ import { CODES, HighLevelProducer as RdKafkaProducer, Message } from 'node-rdkaf
|
||||
import path from 'path'
|
||||
import { Gauge } from 'prom-client'
|
||||
|
||||
import { KAFKA_SESSION_RECORDING_EVENTS } from '../../../config/kafka-topics'
|
||||
import {
|
||||
KAFKA_SESSION_RECORDING_EVENTS,
|
||||
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
|
||||
} from '../../../config/kafka-topics'
|
||||
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
|
||||
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
|
||||
import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer'
|
||||
@ -247,7 +250,7 @@ export class SessionRecordingBlobIngester {
|
||||
this.batchConsumer = await startBatchConsumer({
|
||||
connectionConfig,
|
||||
groupId,
|
||||
topic: KAFKA_SESSION_RECORDING_EVENTS,
|
||||
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
|
||||
sessionTimeout,
|
||||
consumerMaxBytes: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
|
||||
consumerMaxBytesPerPartition: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
|
||||
|
@ -28,7 +28,7 @@ from posthog.kafka_client.client import (
|
||||
KafkaProducer,
|
||||
sessionRecordingKafkaProducer,
|
||||
)
|
||||
from posthog.kafka_client.topics import KAFKA_SESSION_RECORDING_EVENTS
|
||||
from posthog.kafka_client.topics import KAFKA_SESSION_RECORDING_EVENTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
|
||||
from posthog.logging.timing import timed
|
||||
from posthog.metrics import LABEL_RESOURCE_TYPE
|
||||
from posthog.models.utils import UUIDT
|
||||
@ -153,14 +153,21 @@ def build_kafka_event_data(
|
||||
}
|
||||
|
||||
|
||||
def log_event(data: Dict, event_name: str, partition_key: Optional[str]):
|
||||
def _kafka_topic(event_name: str) -> str:
|
||||
# To allow for different quality of service on session recordings
|
||||
# and other events, we push to a different topic.
|
||||
kafka_topic = (
|
||||
KAFKA_SESSION_RECORDING_EVENTS
|
||||
if event_name in SESSION_RECORDING_EVENT_NAMES
|
||||
else settings.KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC
|
||||
)
|
||||
|
||||
match event_name:
|
||||
case "$snapshot":
|
||||
return KAFKA_SESSION_RECORDING_EVENTS
|
||||
case "$snapshot_items":
|
||||
return KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
|
||||
case _:
|
||||
return settings.KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC
|
||||
|
||||
|
||||
def log_event(data: Dict, event_name: str, partition_key: Optional[str]):
|
||||
kafka_topic = _kafka_topic(event_name)
|
||||
|
||||
logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic)
|
||||
|
||||
|
@ -34,7 +34,7 @@ from posthog.api.capture import (
|
||||
from posthog.api.test.mock_sentry import mock_sentry_context_for_tagging
|
||||
from posthog.api.test.openapi_validation import validate_response
|
||||
from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer
|
||||
from posthog.kafka_client.topics import KAFKA_SESSION_RECORDING_EVENTS
|
||||
from posthog.kafka_client.topics import KAFKA_SESSION_RECORDING_EVENTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
|
||||
from posthog.settings import (
|
||||
DATA_UPLOAD_MAX_MEMORY_SIZE,
|
||||
KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC,
|
||||
@ -50,6 +50,11 @@ parser = ResolvingParser(url=str(pathlib.Path(__file__).parent / "../../../opena
|
||||
openapi_spec = cast(Dict[str, Any], parser.specification)
|
||||
|
||||
|
||||
large_data_array = [
|
||||
random.choice(string.ascii_letters) for _ in range(700 * 1024)
|
||||
] # 512 * 1024 is the max size of a single message and random letters shouldn't be compressible, so this should be at least 2 messages
|
||||
|
||||
|
||||
class TestCapture(BaseTest):
|
||||
"""
|
||||
Tests all data capture endpoints (e.g. `/capture` `/batch/`).
|
||||
@ -1241,12 +1246,37 @@ class TestCapture(BaseTest):
|
||||
|
||||
@patch("posthog.kafka_client.client._KafkaProducer.produce")
|
||||
def test_legacy_recording_ingestion_large_is_split_into_multiple_messages(self, kafka_produce) -> None:
|
||||
data = [
|
||||
random.choice(string.ascii_letters) for _ in range(700 * 1024)
|
||||
] # 512 * 1024 is the max size of a single message and random letters shouldn't be compressible, so this should be at least 2 messages
|
||||
self._send_session_recording_event(event_data=data)
|
||||
self._send_session_recording_event(event_data=large_data_array)
|
||||
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
|
||||
self.assertGreater(topic_counter[KAFKA_SESSION_RECORDING_EVENTS], 1)
|
||||
|
||||
assert topic_counter == Counter({KAFKA_SESSION_RECORDING_EVENTS: 3})
|
||||
|
||||
@patch("posthog.kafka_client.client._KafkaProducer.produce")
|
||||
def test_recording_ingestion_can_write_to_blob_ingestion_topic_with_usual_size_limit(self, kafka_produce) -> None:
|
||||
with self.settings(
|
||||
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
|
||||
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="512",
|
||||
):
|
||||
self._send_session_recording_event(event_data=large_data_array)
|
||||
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
|
||||
|
||||
# this fake data doesn't split, so we send one huge message to the item events topic
|
||||
assert topic_counter == Counter(
|
||||
{KAFKA_SESSION_RECORDING_EVENTS: 3, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}
|
||||
)
|
||||
|
||||
@patch("posthog.kafka_client.client._KafkaProducer.produce")
|
||||
def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produce) -> None:
|
||||
with self.settings(
|
||||
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
|
||||
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="20480",
|
||||
):
|
||||
self._send_session_recording_event(event_data=large_data_array)
|
||||
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
|
||||
|
||||
assert topic_counter == Counter(
|
||||
{KAFKA_SESSION_RECORDING_EVENTS: 3, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}
|
||||
)
|
||||
|
||||
@patch("posthog.kafka_client.client.SessionRecordingKafkaProducer")
|
||||
def test_create_session_recording_kafka_with_expected_hosts(
|
||||
|
@ -9,8 +9,6 @@ KAFKA_PERSON = f"{KAFKA_PREFIX}clickhouse_person{SUFFIX}"
|
||||
KAFKA_PERSON_UNIQUE_ID = f"{KAFKA_PREFIX}clickhouse_person_unique_id{SUFFIX}" # DEPRECATED_DO_NOT_USE
|
||||
KAFKA_PERSON_DISTINCT_ID = f"{KAFKA_PREFIX}clickhouse_person_distinct_id{SUFFIX}"
|
||||
KAFKA_PERSON_OVERRIDES = f"{KAFKA_PREFIX}clickhouse_person_override{SUFFIX}"
|
||||
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}"
|
||||
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}"
|
||||
KAFKA_PERFORMANCE_EVENTS = f"{KAFKA_PREFIX}clickhouse_performance_events{SUFFIX}"
|
||||
KAFKA_PLUGIN_LOG_ENTRIES = f"{KAFKA_PREFIX}plugin_log_entries{SUFFIX}"
|
||||
KAFKA_DEAD_LETTER_QUEUE = f"{KAFKA_PREFIX}events_dead_letter_queue{SUFFIX}"
|
||||
@ -18,5 +16,12 @@ KAFKA_GROUPS = f"{KAFKA_PREFIX}clickhouse_groups{SUFFIX}"
|
||||
KAFKA_INGESTION_WARNINGS = f"{KAFKA_PREFIX}clickhouse_ingestion_warnings{SUFFIX}"
|
||||
KAFKA_APP_METRICS = f"{KAFKA_PREFIX}clickhouse_app_metrics{SUFFIX}"
|
||||
KAFKA_METRICS_TIME_TO_SEE_DATA = f"{KAFKA_PREFIX}clickhouse_metrics_time_to_see_data{SUFFIX}"
|
||||
KAFKA_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}session_recording_events{SUFFIX}"
|
||||
KAFKA_PERSON_OVERRIDE = f"{KAFKA_PREFIX}clickhouse_person_override{SUFFIX}"
|
||||
|
||||
# from capture to recordings consumer
|
||||
KAFKA_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}session_recording_events{SUFFIX}"
|
||||
# from capture to recordings blob ingestion consumer
|
||||
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = f"{KAFKA_PREFIX}session_recording_snapshot_item_events{SUFFIX}"
|
||||
# from recordings consumer to clickhouse
|
||||
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}"
|
||||
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}"
|
||||
|
@ -193,7 +193,7 @@ def preprocess_replay_events(events: List[Event], max_size_bytes=1024 * 1024) ->
|
||||
else:
|
||||
snapshot_data_list = list(flatten([event["properties"]["$snapshot_data"] for event in events], max_depth=1))
|
||||
|
||||
# 2. Otherwise try and group all the events if they are small enough
|
||||
# 2. Otherwise, try and group all the events if they are small enough
|
||||
if byte_size_dict(snapshot_data_list) < max_size_bytes:
|
||||
event = new_event(snapshot_data_list)
|
||||
yield event
|
||||
|
Loading…
Reference in New Issue
Block a user