0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 00:47:50 +01:00

feat: using gzip by hand in the replay pipeline (#23479)

This commit is contained in:
Paul D'Ambra 2024-07-08 07:48:20 +01:00 committed by GitHub
parent d3026887df
commit 432396c170
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 155 additions and 48 deletions

View File

@ -5,7 +5,6 @@
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
<env name="BILLING_SERVICE_URL" value="https://billing.dev.posthog.dev" />
<env name="CAPTURE_TIME_TO_SEE_DATA" value="0" />
<env name="CLICKHOUSE_SECURE" value="False" />
@ -16,7 +15,8 @@
<env name="KEA_VERBOSE_LOGGING" value="false" />
<env name="PRINT_SQL" value="1" />
<env name="PYDEVD_USE_CYTHON" value="NO" />
<env name="SESSION_RECORDING_KAFKA_COMPRESSION" value="gzip" />
<env name="PYTHONUNBUFFERED" value="1" />
<env name="SESSION_RECORDING_KAFKA_COMPRESSION" value="gzip-in-capture" />
<env name="SESSION_RECORDING_KAFKA_HOSTS" value="localhost" />
<env name="SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES" value="20971520" />
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />

View File

@ -340,6 +340,7 @@ export class SessionRecordingIngester {
assignedPartitions: this.assignedPartitions,
})
}
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch`,
sendTimeoutGuardToSentry: false,

View File

@ -12,6 +12,27 @@ import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
import { IncomingRecordingMessage, ParsedBatch, PersistedRecordingMessage } from './types'
const { promisify } = require('node:util')
const { unzip } = require('node:zlib')
const GZIP_HEADER = Buffer.from([0x1f, 0x8b, 0x08, 0x00])
function isGzipped(buffer: Buffer): boolean {
if (buffer.length < GZIP_HEADER.length) {
return false
}
for (let i = 0; i < GZIP_HEADER.length; i++) {
if (buffer[i] !== GZIP_HEADER[i]) {
return false
}
}
return true
}
const do_unzip = promisify(unzip)
const counterKafkaMessageReceived = new Counter({
name: 'recording_blob_ingestion_kafka_message_received',
help: 'The number of messages we have received from Kafka',
@ -246,8 +267,17 @@ export const parseKafkaMessage = async (
let messagePayload: RawEventMessage
let event: PipelineEvent
let messageUnzipped = message.value
try {
messagePayload = JSON.parse(message.value.toString())
if (isGzipped(message.value)) {
messageUnzipped = await do_unzip(message.value)
}
} catch (error) {
return dropMessage('invalid_gzip_data', { error })
}
try {
messagePayload = JSON.parse(messageUnzipped.toString())
event = JSON.parse(messagePayload.data)
} catch (error) {
return dropMessage('invalid_json', { error })

View File

@ -1,3 +1,4 @@
import gzip
import json
import re
from random import random
@ -15,19 +16,20 @@ from django.views.decorators.csrf import csrf_exempt
from enum import Enum
from kafka.errors import KafkaError, MessageSizeTooLargeError, KafkaTimeoutError
from kafka.producer.future import FutureRecordMetadata
from prometheus_client import Counter, Gauge
from prometheus_client import Counter, Gauge, Histogram
from rest_framework import status
from sentry_sdk import configure_scope
from sentry_sdk.api import capture_exception, start_span
from statshog.defaults.django import statsd
from token_bucket import Limiter, MemoryStorage
from typing import Any, Optional, Literal
from collections.abc import Callable
from ee.billing.quota_limiting import QuotaLimitingCaches
from posthog.api.utils import get_data, get_token, safe_clickhouse_string
from posthog.cache_utils import cache_for
from posthog.exceptions import generate_exception_response
from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer
from posthog.kafka_client.client import KafkaProducer, session_recording_kafka_producer
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_SESSION_RECORDING_EVENTS,
@ -116,6 +118,12 @@ KAFKA_TIMEOUT_ERROR_COUNTER = Counter(
labelnames=["retry_count", "status_code"],
)
REPLAY_MESSAGE_PRODUCTION_TIMER = Histogram(
"capture_replay_message_production_seconds",
"Time taken to produce a set of replay messages",
labelnames=["compress_in_capture"],
)
# This is a heuristic of ids we have seen used as anonymous. As they frequently
# have significantly more traffic than non-anonymous distinct_ids, and likely
# don't refer to the same underlying person we prefer to partition them randomly
@ -200,6 +208,7 @@ def log_event(
headers: Optional[list] = None,
historical: bool = False,
overflowing: bool = False,
value_serializer: Optional[Callable[[Any], Any]] = None,
) -> FutureRecordMetadata:
kafka_topic = _kafka_topic(event_name, historical=historical, overflowing=overflowing)
@ -208,11 +217,13 @@ def log_event(
# TODO: Handle Kafka being unavailable with exponential backoff retries
try:
if event_name in SESSION_RECORDING_DEDICATED_KAFKA_EVENTS:
producer = sessionRecordingKafkaProducer()
producer = session_recording_kafka_producer()
else:
producer = KafkaProducer()
future = producer.produce(topic=kafka_topic, data=data, key=partition_key, headers=headers)
future = producer.produce(
topic=kafka_topic, data=data, key=partition_key, headers=headers, value_serializer=value_serializer
)
statsd.incr("posthog_cloud_plugin_server_ingestion")
return future
except Exception:
@ -556,36 +567,40 @@ def get_event(request):
# This is mostly a copy of above except we only log, we don't error out
if alternative_replay_events:
processed_events = list(preprocess_events(alternative_replay_events))
for event, event_uuid, distinct_id in processed_events:
capture_args = (
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
)
capture_kwargs = {
"extra_headers": [("lib_version", lib_version)],
}
this_future = capture_internal(*capture_args, **capture_kwargs)
replay_futures.append((this_future, capture_args, capture_kwargs))
compression_in_capture = _compress_in_capture(token)
with REPLAY_MESSAGE_PRODUCTION_TIMER.labels(compress_in_capture=compression_in_capture).time():
for event, event_uuid, distinct_id in processed_events:
capture_args = (
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
)
capture_kwargs = {
"extra_headers": [
("lib_version", lib_version),
],
}
this_future = capture_internal(*capture_args, **capture_kwargs)
replay_futures.append((this_future, capture_args, capture_kwargs))
start_time = time.monotonic()
for future, args, kwargs in replay_futures:
if future is not None:
try:
future.get(
timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS - (time.monotonic() - start_time)
)
except MessageSizeTooLargeError as mstle:
REPLAY_MESSAGE_SIZE_TOO_LARGE_COUNTER.inc()
warning_event = replace_with_warning(args[0], token, mstle, lib_version)
if warning_event:
warning_future = capture_internal(warning_event, *args[1:], **kwargs)
warning_future.get(timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS)
start_time = time.monotonic()
for future, args, kwargs in replay_futures:
if future is not None:
try:
future.get(
timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS - (time.monotonic() - start_time)
)
except MessageSizeTooLargeError as mstle:
REPLAY_MESSAGE_SIZE_TOO_LARGE_COUNTER.inc()
warning_event = replace_with_warning(args[0], token, mstle, lib_version)
if warning_event:
warning_future = capture_internal(warning_event, *args[1:], **kwargs)
warning_future.get(timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS)
except ValueError as e:
with sentry_sdk.push_scope() as scope:
@ -635,6 +650,17 @@ def get_event(request):
return cors_response(request, JsonResponse({"status": 1}))
def _compress_in_capture(token: str | None) -> bool:
if (
# this check is only here so that we can test in a limited way in production
# ultimately we'll only check this setting
(settings.DEBUG or settings.TEST or token == "sTMFPsFhdP1Ssg")
and settings.SESSION_RECORDING_KAFKA_COMPRESSION == "gzip-in-capture"
):
return True
return False
def replace_with_warning(
event: dict[str, Any], token: str, mstle: MessageSizeTooLargeError, lib_version: str
) -> dict[str, Any] | None:
@ -799,9 +825,14 @@ def capture_internal(
token=token,
)
def gzip_json_serializer(data):
json_data = json.dumps(data).encode("utf-8")
return gzip.compress(json_data)
if event["event"] in SESSION_RECORDING_EVENT_NAMES:
session_id = event["properties"]["$session_id"]
headers = [("token", token), *extra_headers]
value_serializer = gzip_json_serializer if (_compress_in_capture(token)) else None
overflowing = False
if token in settings.REPLAY_OVERFLOW_FORCED_TOKENS:
@ -810,7 +841,12 @@ def capture_internal(
overflowing = session_id in _list_overflowing_keys(InputType.REPLAY)
return log_event(
parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing
parsed_event,
event["event"],
partition_key=session_id,
headers=headers,
overflowing=overflowing,
value_serializer=value_serializer,
)
# We aim to always partition by {team_id}:{distinct_id} but allow

View File

@ -43,7 +43,7 @@ from posthog.api.capture import (
)
from posthog.api.test.mock_sentry import mock_sentry_context_for_tagging
from posthog.api.test.openapi_validation import validate_response
from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer
from posthog.kafka_client.client import KafkaProducer, session_recording_kafka_producer
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
@ -371,6 +371,7 @@ class TestCapture(BaseTest):
data=ANY,
key=None if expect_random_partitioning else ANY,
headers=None,
value_serializer=None,
)
if not expect_random_partitioning:
@ -1946,7 +1947,7 @@ class TestCapture(BaseTest):
self._send_august_2023_version_session_recording_event(event_data=None)
session_recording_producer_singleton_mock.assert_called_with(
compression_type=None,
compression_type="gzip",
kafka_hosts=[
"another-server:9092",
"a-fourth.server:9092",
@ -1955,7 +1956,7 @@ class TestCapture(BaseTest):
max_request_size=1234,
)
@patch("posthog.api.capture.sessionRecordingKafkaProducer")
@patch("posthog.api.capture.session_recording_kafka_producer")
@patch("posthog.api.capture.KafkaProducer")
@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_can_redirect_session_recordings_to_alternative_kafka(
@ -1972,7 +1973,7 @@ class TestCapture(BaseTest):
],
):
default_kafka_producer_mock.return_value = KafkaProducer()
session_recording_producer_factory_mock.return_value = sessionRecordingKafkaProducer()
session_recording_producer_factory_mock.return_value = session_recording_kafka_producer()
session_id = "test_can_redirect_session_recordings_to_alternative_kafka"
# just a single thing to send (it should be an rrweb event but capture doesn't validate that)
@ -1989,6 +1990,37 @@ class TestCapture(BaseTest):
assert data_sent_to_recording_kafka["event"] == "$snapshot_items"
assert len(data_sent_to_recording_kafka["properties"]["$snapshot_items"]) == 1
@patch("posthog.api.capture.session_recording_kafka_producer")
@patch("posthog.api.capture.KafkaProducer")
@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_can_compress_messages_before_kafka(
self,
kafka_produce: MagicMock,
_default_kafka_producer_mock: MagicMock,
session_recording_producer_factory_mock: MagicMock,
) -> None:
with self.settings(
KAFKA_HOSTS=["first.server:9092", "second.server:9092"],
SESSION_RECORDING_KAFKA_HOSTS=[
"another-server:9092",
"a-fourth.server:9092",
],
SESSION_RECORDING_KAFKA_COMPRESSION="gzip-in-capture",
):
session_recording_producer_factory_mock.return_value = session_recording_kafka_producer()
session_id = "test_can_redirect_session_recordings_to_alternative_kafka"
self._send_august_2023_version_session_recording_event(event_data={}, session_id=session_id)
assert len(kafka_produce.call_args_list) == 1
call_one = kafka_produce.call_args_list[0][1]
assert call_one["key"] == session_id
assert call_one["value_serializer"] is not None
serialized = call_one["value_serializer"]({"i am": "a string"})
assert serialized.startswith(b"\x1f\x8b\x08\x00")
def test_get_distinct_id_non_json_properties(self) -> None:
with self.assertRaises(ValueError):
get_distinct_id({"properties": "str"})

View File

@ -205,12 +205,15 @@ KafkaProducer = SingletonDecorator(_KafkaProducer)
SessionRecordingKafkaProducer = SingletonDecorator(_KafkaProducer)
def sessionRecordingKafkaProducer() -> _KafkaProducer:
def session_recording_kafka_producer() -> _KafkaProducer:
return SessionRecordingKafkaProducer(
kafka_hosts=settings.SESSION_RECORDING_KAFKA_HOSTS,
kafka_security_protocol=settings.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
max_request_size=settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES,
compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION,
# if the message has already been serialized with gzip, we don't need to double compress it
compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION
if settings.SESSION_RECORDING_KAFKA_COMPRESSION == "gzip"
else None,
)

View File

@ -212,11 +212,6 @@ SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("SESSION_RECORDING_
# Useful if clickhouse is hosted outside the cluster.
KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", "")) or KAFKA_HOSTS
# can set ('gzip', 'snappy', 'lz4', 'zstd' None)
# NB if you want to set a compression you need to install it... the producer compresses not kafka
# so, at time of writing only 'gzip' and None/'uncompressed' are available
SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRESSION", None)
# To support e.g. Multi-tenanted plans on Heroko, we support specifying a prefix for
# Kafka Topics. See
# https://devcenter.heroku.com/articles/multi-tenant-kafka-on-heroku#differences-to-dedicated-kafka-plans

View File

@ -31,3 +31,13 @@ REPLAY_MESSAGE_TOO_LARGE_SAMPLE_RATE = get_from_env("REPLAY_MESSAGE_TOO_LARGE_SA
REPLAY_MESSAGE_TOO_LARGE_SAMPLE_BUCKET = get_from_env(
"REPLAY_MESSAGE_TOO_LARGE_SAMPLE_BUCKET", "posthog-cloud-prod-us-east-1-k8s-replay-samples"
)
# NB if you want to set a compression you need to install it... the producer compresses not kafka
# accepts
# * None - no compression
# * gzip - gzip compression by the kafka producer (auto decompressed by the consumer in blobby)
# * gzip-in-capture - gzip in compression in the capture service (manually decompressed by the consumer in blobby)
#
# gzip is the current default in production
# TODO we can clean this up once we've tested the new gzip-in-capture compression and don't need a setting
SESSION_RECORDING_KAFKA_COMPRESSION = get_from_env("SESSION_RECORDING_KAFKA_COMPRESSION", "gzip")