diff --git a/.run/PostHog.run.xml b/.run/PostHog.run.xml
index fd66479a060..d637d92aabe 100644
--- a/.run/PostHog.run.xml
+++ b/.run/PostHog.run.xml
@@ -14,9 +14,11 @@
-
-
+
+
+
+
diff --git a/posthog/api/capture.py b/posthog/api/capture.py
index 444dbac9a9b..f19ef5d5115 100644
--- a/posthog/api/capture.py
+++ b/posthog/api/capture.py
@@ -58,7 +58,6 @@ LOG_RATE_LIMITER = Limiter(
SESSION_RECORDING_DEDICATED_KAFKA_EVENTS = ("$snapshot_items",)
SESSION_RECORDING_EVENT_NAMES = ("$snapshot", "$performance_event") + SESSION_RECORDING_DEDICATED_KAFKA_EVENTS
-
EVENTS_RECEIVED_COUNTER = Counter(
"capture_events_received_total",
"Events received by capture, tagged by resource type.",
@@ -102,7 +101,6 @@ REPLAY_INGESTION_BATCH_COMPRESSION_RATIO_HISTOGRAM = Histogram(
labelnames=["method"],
)
-
# This is a heuristic of ids we have seen used as anonymous. As they frequently
# have significantly more traffic than non-anonymous distinct_ids, and likely
# don't refer to the same underlying person we prefer to partition them randomly
@@ -388,7 +386,9 @@ def get_event(request):
if random() <= settings.REPLAY_BLOB_INGESTION_TRAFFIC_RATIO:
# The new flow we only enable if the dedicated kafka is enabled
- processed_replay_events += preprocess_replay_events_for_blob_ingestion(replay_events)
+ processed_replay_events += preprocess_replay_events_for_blob_ingestion(
+ replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES
+ )
events = processed_replay_events + other_events
diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py
index f5047774c7b..ff1677467ea 100644
--- a/posthog/api/test/test_capture.py
+++ b/posthog/api/test/test_capture.py
@@ -1255,7 +1255,7 @@ class TestCapture(BaseTest):
def test_recording_ingestion_can_write_to_blob_ingestion_topic_with_usual_size_limit(self, kafka_produce) -> None:
with self.settings(
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
- SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="512",
+ SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=512,
):
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
@@ -1269,7 +1269,7 @@ class TestCapture(BaseTest):
def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produce) -> None:
with self.settings(
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
- SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="20480",
+ SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480,
):
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
@@ -1289,9 +1289,8 @@ class TestCapture(BaseTest):
SESSION_RECORDING_KAFKA_HOSTS=["another-server:9092", "a-fourth.server:9092"],
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL="SSL",
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
- SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="1234",
+ SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=1234,
):
-
# avoid logs from being printed because the mock is None
session_recording_producer_singleton_mock.return_value = KafkaProducer()
@@ -1305,7 +1304,7 @@ class TestCapture(BaseTest):
"a-fourth.server:9092",
],
kafka_security_protocol="SSL",
- max_message_bytes="1234",
+ max_request_size=1234,
)
@patch("posthog.api.capture.sessionRecordingKafkaProducer")
diff --git a/posthog/kafka_client/client.py b/posthog/kafka_client/client.py
index c566058a6fb..248ff573039 100644
--- a/posthog/kafka_client/client.py
+++ b/posthog/kafka_client/client.py
@@ -99,7 +99,7 @@ class _KafkaProducer:
kafka_base64_keys=None,
kafka_hosts=None,
kafka_security_protocol=None,
- max_message_bytes=None,
+ max_request_size=None,
compression_type=None,
):
if kafka_security_protocol is None:
@@ -119,7 +119,7 @@ class _KafkaProducer:
bootstrap_servers=kafka_hosts,
security_protocol=kafka_security_protocol or _KafkaSecurityProtocol.PLAINTEXT,
compression_type=compression_type,
- **{"max.message.bytes": max_message_bytes} if max_message_bytes else {},
+ **{"max_request_size": max_request_size} if max_request_size else {},
**_sasl_params(),
)
@@ -187,7 +187,7 @@ def sessionRecordingKafkaProducer() -> _KafkaProducer:
return SessionRecordingKafkaProducer(
kafka_hosts=settings.SESSION_RECORDING_KAFKA_HOSTS,
kafka_security_protocol=settings.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
- max_message_bytes=settings.SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES,
+ max_request_size=settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES,
compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION,
)
diff --git a/posthog/session_recordings/session_recording_helpers.py b/posthog/session_recordings/session_recording_helpers.py
index dfbab3325c1..3b05fc9bd46 100644
--- a/posthog/session_recordings/session_recording_helpers.py
+++ b/posthog/session_recordings/session_recording_helpers.py
@@ -441,5 +441,5 @@ def get_events_summary_from_snapshot_data(snapshot_data: List[SnapshotData]) ->
return events_summary
-def byte_size_dict(d: Dict) -> int:
- return len(json.dumps(d))
+def byte_size_dict(x: Dict | List) -> int:
+ return len(json.dumps(x))
diff --git a/posthog/settings/data_stores.py b/posthog/settings/data_stores.py
index b1e357cc036..639636e52fe 100644
--- a/posthog/settings/data_stores.py
+++ b/posthog/settings/data_stores.py
@@ -180,7 +180,9 @@ SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("SESSION_RECORDING_
# Useful if clickhouse is hosted outside the cluster.
KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", "")) or KAFKA_HOSTS
-# can set ('gzip', 'snappy', 'lz4', None)
+# can set ('gzip', 'snappy', 'lz4', 'zstd' None)
+# NB if you want to set a compression you need to install it... the producer compresses not kafka
+# so, at time of writing only 'gzip' and None/'uncompressed' are available
SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRESSION", None)
# To support e.g. Multi-tenanted plans on Heroko, we support specifying a prefix for
@@ -191,8 +193,10 @@ KAFKA_PREFIX = os.getenv("KAFKA_PREFIX", "")
KAFKA_BASE64_KEYS = get_from_env("KAFKA_BASE64_KEYS", False, type_cast=str_to_bool)
-SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES = get_from_env(
- "SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES", None, type_cast=int, optional=True
+SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES: int = get_from_env(
+ "SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES",
+ 1024 * 1024 * 0.95, # a little less than 1MB to account for overhead
+ type_cast=int,
)
KAFKA_SECURITY_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", None)