diff --git a/.run/PostHog.run.xml b/.run/PostHog.run.xml
index 77bf3859b14..32624c2ec1c 100644
--- a/.run/PostHog.run.xml
+++ b/.run/PostHog.run.xml
@@ -5,7 +5,6 @@
-
@@ -16,7 +15,8 @@
-
+
+
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
index 3be63ed2bb0..5d1c289d1d9 100644
--- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
+++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
@@ -340,6 +340,7 @@ export class SessionRecordingIngester {
assignedPartitions: this.assignedPartitions,
})
}
+
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch`,
sendTimeoutGuardToSentry: false,
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts
index e0426a12670..c509ee2d145 100644
--- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts
+++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts
@@ -12,6 +12,27 @@ import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
import { IncomingRecordingMessage, ParsedBatch, PersistedRecordingMessage } from './types'
+const { promisify } = require('node:util')
+const { unzip } = require('node:zlib')
+
+const GZIP_HEADER = Buffer.from([0x1f, 0x8b, 0x08, 0x00])
+
+function isGzipped(buffer: Buffer): boolean {
+ if (buffer.length < GZIP_HEADER.length) {
+ return false
+ }
+
+ for (let i = 0; i < GZIP_HEADER.length; i++) {
+ if (buffer[i] !== GZIP_HEADER[i]) {
+ return false
+ }
+ }
+
+ return true
+}
+
+const do_unzip = promisify(unzip)
+
const counterKafkaMessageReceived = new Counter({
name: 'recording_blob_ingestion_kafka_message_received',
help: 'The number of messages we have received from Kafka',
@@ -246,8 +267,17 @@ export const parseKafkaMessage = async (
let messagePayload: RawEventMessage
let event: PipelineEvent
+ let messageUnzipped = message.value
try {
- messagePayload = JSON.parse(message.value.toString())
+ if (isGzipped(message.value)) {
+ messageUnzipped = await do_unzip(message.value)
+ }
+ } catch (error) {
+ return dropMessage('invalid_gzip_data', { error })
+ }
+
+ try {
+ messagePayload = JSON.parse(messageUnzipped.toString())
event = JSON.parse(messagePayload.data)
} catch (error) {
return dropMessage('invalid_json', { error })
diff --git a/posthog/api/capture.py b/posthog/api/capture.py
index 8fa6459e41f..7e7b9efbf8c 100644
--- a/posthog/api/capture.py
+++ b/posthog/api/capture.py
@@ -1,3 +1,4 @@
+import gzip
import json
import re
from random import random
@@ -15,19 +16,20 @@ from django.views.decorators.csrf import csrf_exempt
from enum import Enum
from kafka.errors import KafkaError, MessageSizeTooLargeError, KafkaTimeoutError
from kafka.producer.future import FutureRecordMetadata
-from prometheus_client import Counter, Gauge
+from prometheus_client import Counter, Gauge, Histogram
from rest_framework import status
from sentry_sdk import configure_scope
from sentry_sdk.api import capture_exception, start_span
from statshog.defaults.django import statsd
from token_bucket import Limiter, MemoryStorage
from typing import Any, Optional, Literal
+from collections.abc import Callable
from ee.billing.quota_limiting import QuotaLimitingCaches
from posthog.api.utils import get_data, get_token, safe_clickhouse_string
from posthog.cache_utils import cache_for
from posthog.exceptions import generate_exception_response
-from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer
+from posthog.kafka_client.client import KafkaProducer, session_recording_kafka_producer
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_SESSION_RECORDING_EVENTS,
@@ -116,6 +118,12 @@ KAFKA_TIMEOUT_ERROR_COUNTER = Counter(
labelnames=["retry_count", "status_code"],
)
+REPLAY_MESSAGE_PRODUCTION_TIMER = Histogram(
+ "capture_replay_message_production_seconds",
+ "Time taken to produce a set of replay messages",
+ labelnames=["compress_in_capture"],
+)
+
# This is a heuristic of ids we have seen used as anonymous. As they frequently
# have significantly more traffic than non-anonymous distinct_ids, and likely
# don't refer to the same underlying person we prefer to partition them randomly
@@ -200,6 +208,7 @@ def log_event(
headers: Optional[list] = None,
historical: bool = False,
overflowing: bool = False,
+ value_serializer: Optional[Callable[[Any], Any]] = None,
) -> FutureRecordMetadata:
kafka_topic = _kafka_topic(event_name, historical=historical, overflowing=overflowing)
@@ -208,11 +217,13 @@ def log_event(
# TODO: Handle Kafka being unavailable with exponential backoff retries
try:
if event_name in SESSION_RECORDING_DEDICATED_KAFKA_EVENTS:
- producer = sessionRecordingKafkaProducer()
+ producer = session_recording_kafka_producer()
else:
producer = KafkaProducer()
- future = producer.produce(topic=kafka_topic, data=data, key=partition_key, headers=headers)
+ future = producer.produce(
+ topic=kafka_topic, data=data, key=partition_key, headers=headers, value_serializer=value_serializer
+ )
statsd.incr("posthog_cloud_plugin_server_ingestion")
return future
except Exception:
@@ -556,36 +567,40 @@ def get_event(request):
# This is mostly a copy of above except we only log, we don't error out
if alternative_replay_events:
processed_events = list(preprocess_events(alternative_replay_events))
- for event, event_uuid, distinct_id in processed_events:
- capture_args = (
- event,
- distinct_id,
- ip,
- site_url,
- now,
- sent_at,
- event_uuid,
- token,
- )
- capture_kwargs = {
- "extra_headers": [("lib_version", lib_version)],
- }
- this_future = capture_internal(*capture_args, **capture_kwargs)
- replay_futures.append((this_future, capture_args, capture_kwargs))
+ compression_in_capture = _compress_in_capture(token)
+ with REPLAY_MESSAGE_PRODUCTION_TIMER.labels(compress_in_capture=compression_in_capture).time():
+ for event, event_uuid, distinct_id in processed_events:
+ capture_args = (
+ event,
+ distinct_id,
+ ip,
+ site_url,
+ now,
+ sent_at,
+ event_uuid,
+ token,
+ )
+ capture_kwargs = {
+ "extra_headers": [
+ ("lib_version", lib_version),
+ ],
+ }
+ this_future = capture_internal(*capture_args, **capture_kwargs)
+ replay_futures.append((this_future, capture_args, capture_kwargs))
- start_time = time.monotonic()
- for future, args, kwargs in replay_futures:
- if future is not None:
- try:
- future.get(
- timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS - (time.monotonic() - start_time)
- )
- except MessageSizeTooLargeError as mstle:
- REPLAY_MESSAGE_SIZE_TOO_LARGE_COUNTER.inc()
- warning_event = replace_with_warning(args[0], token, mstle, lib_version)
- if warning_event:
- warning_future = capture_internal(warning_event, *args[1:], **kwargs)
- warning_future.get(timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS)
+ start_time = time.monotonic()
+ for future, args, kwargs in replay_futures:
+ if future is not None:
+ try:
+ future.get(
+ timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS - (time.monotonic() - start_time)
+ )
+ except MessageSizeTooLargeError as mstle:
+ REPLAY_MESSAGE_SIZE_TOO_LARGE_COUNTER.inc()
+ warning_event = replace_with_warning(args[0], token, mstle, lib_version)
+ if warning_event:
+ warning_future = capture_internal(warning_event, *args[1:], **kwargs)
+ warning_future.get(timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS)
except ValueError as e:
with sentry_sdk.push_scope() as scope:
@@ -635,6 +650,17 @@ def get_event(request):
return cors_response(request, JsonResponse({"status": 1}))
+def _compress_in_capture(token: str | None) -> bool:
+ if (
+ # this check is only here so that we can test in a limited way in production
+ # ultimately we'll only check this setting
+ (settings.DEBUG or settings.TEST or token == "sTMFPsFhdP1Ssg")
+ and settings.SESSION_RECORDING_KAFKA_COMPRESSION == "gzip-in-capture"
+ ):
+ return True
+ return False
+
+
def replace_with_warning(
event: dict[str, Any], token: str, mstle: MessageSizeTooLargeError, lib_version: str
) -> dict[str, Any] | None:
@@ -799,9 +825,14 @@ def capture_internal(
token=token,
)
+ def gzip_json_serializer(data):
+ json_data = json.dumps(data).encode("utf-8")
+ return gzip.compress(json_data)
+
if event["event"] in SESSION_RECORDING_EVENT_NAMES:
session_id = event["properties"]["$session_id"]
headers = [("token", token), *extra_headers]
+ value_serializer = gzip_json_serializer if (_compress_in_capture(token)) else None
overflowing = False
if token in settings.REPLAY_OVERFLOW_FORCED_TOKENS:
@@ -810,7 +841,12 @@ def capture_internal(
overflowing = session_id in _list_overflowing_keys(InputType.REPLAY)
return log_event(
- parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing
+ parsed_event,
+ event["event"],
+ partition_key=session_id,
+ headers=headers,
+ overflowing=overflowing,
+ value_serializer=value_serializer,
)
# We aim to always partition by {team_id}:{distinct_id} but allow
diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py
index 8512d73ab2f..7ddd8353a80 100644
--- a/posthog/api/test/test_capture.py
+++ b/posthog/api/test/test_capture.py
@@ -43,7 +43,7 @@ from posthog.api.capture import (
)
from posthog.api.test.mock_sentry import mock_sentry_context_for_tagging
from posthog.api.test.openapi_validation import validate_response
-from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer
+from posthog.kafka_client.client import KafkaProducer, session_recording_kafka_producer
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
@@ -371,6 +371,7 @@ class TestCapture(BaseTest):
data=ANY,
key=None if expect_random_partitioning else ANY,
headers=None,
+ value_serializer=None,
)
if not expect_random_partitioning:
@@ -1946,7 +1947,7 @@ class TestCapture(BaseTest):
self._send_august_2023_version_session_recording_event(event_data=None)
session_recording_producer_singleton_mock.assert_called_with(
- compression_type=None,
+ compression_type="gzip",
kafka_hosts=[
"another-server:9092",
"a-fourth.server:9092",
@@ -1955,7 +1956,7 @@ class TestCapture(BaseTest):
max_request_size=1234,
)
- @patch("posthog.api.capture.sessionRecordingKafkaProducer")
+ @patch("posthog.api.capture.session_recording_kafka_producer")
@patch("posthog.api.capture.KafkaProducer")
@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_can_redirect_session_recordings_to_alternative_kafka(
@@ -1972,7 +1973,7 @@ class TestCapture(BaseTest):
],
):
default_kafka_producer_mock.return_value = KafkaProducer()
- session_recording_producer_factory_mock.return_value = sessionRecordingKafkaProducer()
+ session_recording_producer_factory_mock.return_value = session_recording_kafka_producer()
session_id = "test_can_redirect_session_recordings_to_alternative_kafka"
# just a single thing to send (it should be an rrweb event but capture doesn't validate that)
@@ -1989,6 +1990,37 @@ class TestCapture(BaseTest):
assert data_sent_to_recording_kafka["event"] == "$snapshot_items"
assert len(data_sent_to_recording_kafka["properties"]["$snapshot_items"]) == 1
+ @patch("posthog.api.capture.session_recording_kafka_producer")
+ @patch("posthog.api.capture.KafkaProducer")
+ @patch("posthog.kafka_client.client._KafkaProducer.produce")
+ def test_can_compress_messages_before_kafka(
+ self,
+ kafka_produce: MagicMock,
+ _default_kafka_producer_mock: MagicMock,
+ session_recording_producer_factory_mock: MagicMock,
+ ) -> None:
+ with self.settings(
+ KAFKA_HOSTS=["first.server:9092", "second.server:9092"],
+ SESSION_RECORDING_KAFKA_HOSTS=[
+ "another-server:9092",
+ "a-fourth.server:9092",
+ ],
+ SESSION_RECORDING_KAFKA_COMPRESSION="gzip-in-capture",
+ ):
+ session_recording_producer_factory_mock.return_value = session_recording_kafka_producer()
+
+ session_id = "test_can_redirect_session_recordings_to_alternative_kafka"
+ self._send_august_2023_version_session_recording_event(event_data={}, session_id=session_id)
+
+ assert len(kafka_produce.call_args_list) == 1
+
+ call_one = kafka_produce.call_args_list[0][1]
+ assert call_one["key"] == session_id
+ assert call_one["value_serializer"] is not None
+
+ serialized = call_one["value_serializer"]({"i am": "a string"})
+ assert serialized.startswith(b"\x1f\x8b\x08\x00")
+
def test_get_distinct_id_non_json_properties(self) -> None:
with self.assertRaises(ValueError):
get_distinct_id({"properties": "str"})
diff --git a/posthog/kafka_client/client.py b/posthog/kafka_client/client.py
index f0008c4ba72..4f9ac7cec55 100644
--- a/posthog/kafka_client/client.py
+++ b/posthog/kafka_client/client.py
@@ -205,12 +205,15 @@ KafkaProducer = SingletonDecorator(_KafkaProducer)
SessionRecordingKafkaProducer = SingletonDecorator(_KafkaProducer)
-def sessionRecordingKafkaProducer() -> _KafkaProducer:
+def session_recording_kafka_producer() -> _KafkaProducer:
return SessionRecordingKafkaProducer(
kafka_hosts=settings.SESSION_RECORDING_KAFKA_HOSTS,
kafka_security_protocol=settings.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
max_request_size=settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES,
- compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION,
+ # if the message has already been serialized with gzip, we don't need to double compress it
+ compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION
+ if settings.SESSION_RECORDING_KAFKA_COMPRESSION == "gzip"
+ else None,
)
diff --git a/posthog/settings/data_stores.py b/posthog/settings/data_stores.py
index 45b0e93dd98..de0902e3ca5 100644
--- a/posthog/settings/data_stores.py
+++ b/posthog/settings/data_stores.py
@@ -212,11 +212,6 @@ SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("SESSION_RECORDING_
# Useful if clickhouse is hosted outside the cluster.
KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", "")) or KAFKA_HOSTS
-# can set ('gzip', 'snappy', 'lz4', 'zstd' None)
-# NB if you want to set a compression you need to install it... the producer compresses not kafka
-# so, at time of writing only 'gzip' and None/'uncompressed' are available
-SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRESSION", None)
-
# To support e.g. Multi-tenanted plans on Heroko, we support specifying a prefix for
# Kafka Topics. See
# https://devcenter.heroku.com/articles/multi-tenant-kafka-on-heroku#differences-to-dedicated-kafka-plans
diff --git a/posthog/settings/session_replay.py b/posthog/settings/session_replay.py
index 0141efc0c09..8ee13e7c36b 100644
--- a/posthog/settings/session_replay.py
+++ b/posthog/settings/session_replay.py
@@ -31,3 +31,13 @@ REPLAY_MESSAGE_TOO_LARGE_SAMPLE_RATE = get_from_env("REPLAY_MESSAGE_TOO_LARGE_SA
REPLAY_MESSAGE_TOO_LARGE_SAMPLE_BUCKET = get_from_env(
"REPLAY_MESSAGE_TOO_LARGE_SAMPLE_BUCKET", "posthog-cloud-prod-us-east-1-k8s-replay-samples"
)
+
+# NB if you want to set a compression you need to install it... the producer compresses not kafka
+# accepts
+# * None - no compression
+# * gzip - gzip compression by the kafka producer (auto decompressed by the consumer in blobby)
+# * gzip-in-capture - gzip in compression in the capture service (manually decompressed by the consumer in blobby)
+#
+# gzip is the current default in production
+# TODO we can clean this up once we've tested the new gzip-in-capture compression and don't need a setting
+SESSION_RECORDING_KAFKA_COMPRESSION = get_from_env("SESSION_RECORDING_KAFKA_COMPRESSION", "gzip")