0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 21:49:51 +01:00

fix: message size and compression settings for session recording production (#16171)

* fix: message size and compression settings for session recording production

* fix config setting

* actually use the message size

* Update query snapshots

* Update query snapshots

* fix default and tests

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Paul D'Ambra 2023-06-21 14:35:12 +01:00 committed by GitHub
parent ffe14c75e7
commit 6835b4d658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 23 additions and 18 deletions

View File

@ -14,9 +14,11 @@
<env name="KEA_VERBOSE_LOGGING" value="false" />
<env name="PRINT_SQL" value="1" />
<env name="PYTHONUNBUFFERED" value="1" />
<env name="SESSION_RECORDING_KAFKA_HOSTS" value="localhost" />
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
<env name="REPLAY_BLOB_INGESTION_TRAFFIC_RATIO" value="1" />
<env name="SESSION_RECORDING_KAFKA_COMPRESSION" value="gzip" />
<env name="SESSION_RECORDING_KAFKA_HOSTS" value="localhost" />
<env name="SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES" value="524288" />
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
</envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/env/bin/python" />
<option name="SDK_NAME" value="Python 3.10 (posthog)" />

View File

@ -58,7 +58,6 @@ LOG_RATE_LIMITER = Limiter(
SESSION_RECORDING_DEDICATED_KAFKA_EVENTS = ("$snapshot_items",)
SESSION_RECORDING_EVENT_NAMES = ("$snapshot", "$performance_event") + SESSION_RECORDING_DEDICATED_KAFKA_EVENTS
EVENTS_RECEIVED_COUNTER = Counter(
"capture_events_received_total",
"Events received by capture, tagged by resource type.",
@ -102,7 +101,6 @@ REPLAY_INGESTION_BATCH_COMPRESSION_RATIO_HISTOGRAM = Histogram(
labelnames=["method"],
)
# This is a heuristic of ids we have seen used as anonymous. As they frequently
# have significantly more traffic than non-anonymous distinct_ids, and likely
# don't refer to the same underlying person we prefer to partition them randomly
@ -388,7 +386,9 @@ def get_event(request):
if random() <= settings.REPLAY_BLOB_INGESTION_TRAFFIC_RATIO:
# The new flow we only enable if the dedicated kafka is enabled
processed_replay_events += preprocess_replay_events_for_blob_ingestion(replay_events)
processed_replay_events += preprocess_replay_events_for_blob_ingestion(
replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES
)
events = processed_replay_events + other_events

View File

@ -1255,7 +1255,7 @@ class TestCapture(BaseTest):
def test_recording_ingestion_can_write_to_blob_ingestion_topic_with_usual_size_limit(self, kafka_produce) -> None:
with self.settings(
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="512",
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=512,
):
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
@ -1269,7 +1269,7 @@ class TestCapture(BaseTest):
def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produce) -> None:
with self.settings(
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="20480",
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480,
):
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
@ -1289,9 +1289,8 @@ class TestCapture(BaseTest):
SESSION_RECORDING_KAFKA_HOSTS=["another-server:9092", "a-fourth.server:9092"],
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL="SSL",
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES="1234",
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=1234,
):
# avoid logs from being printed because the mock is None
session_recording_producer_singleton_mock.return_value = KafkaProducer()
@ -1305,7 +1304,7 @@ class TestCapture(BaseTest):
"a-fourth.server:9092",
],
kafka_security_protocol="SSL",
max_message_bytes="1234",
max_request_size=1234,
)
@patch("posthog.api.capture.sessionRecordingKafkaProducer")

View File

@ -99,7 +99,7 @@ class _KafkaProducer:
kafka_base64_keys=None,
kafka_hosts=None,
kafka_security_protocol=None,
max_message_bytes=None,
max_request_size=None,
compression_type=None,
):
if kafka_security_protocol is None:
@ -119,7 +119,7 @@ class _KafkaProducer:
bootstrap_servers=kafka_hosts,
security_protocol=kafka_security_protocol or _KafkaSecurityProtocol.PLAINTEXT,
compression_type=compression_type,
**{"max.message.bytes": max_message_bytes} if max_message_bytes else {},
**{"max_request_size": max_request_size} if max_request_size else {},
**_sasl_params(),
)
@ -187,7 +187,7 @@ def sessionRecordingKafkaProducer() -> _KafkaProducer:
return SessionRecordingKafkaProducer(
kafka_hosts=settings.SESSION_RECORDING_KAFKA_HOSTS,
kafka_security_protocol=settings.SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL,
max_message_bytes=settings.SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES,
max_request_size=settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES,
compression_type=settings.SESSION_RECORDING_KAFKA_COMPRESSION,
)

View File

@ -441,5 +441,5 @@ def get_events_summary_from_snapshot_data(snapshot_data: List[SnapshotData]) ->
return events_summary
def byte_size_dict(d: Dict) -> int:
return len(json.dumps(d))
def byte_size_dict(x: Dict | List) -> int:
return len(json.dumps(x))

View File

@ -180,7 +180,9 @@ SESSION_RECORDING_KAFKA_HOSTS = _parse_kafka_hosts(os.getenv("SESSION_RECORDING_
# Useful if clickhouse is hosted outside the cluster.
KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHOUSE", "")) or KAFKA_HOSTS
# can set ('gzip', 'snappy', 'lz4', None)
# can set ('gzip', 'snappy', 'lz4', 'zstd' None)
# NB if you want to set a compression you need to install it... the producer compresses not kafka
# so, at time of writing only 'gzip' and None/'uncompressed' are available
SESSION_RECORDING_KAFKA_COMPRESSION = os.getenv("SESSION_RECORDING_KAFKA_COMPRESSION", None)
# To support e.g. Multi-tenanted plans on Heroko, we support specifying a prefix for
@ -191,8 +193,10 @@ KAFKA_PREFIX = os.getenv("KAFKA_PREFIX", "")
KAFKA_BASE64_KEYS = get_from_env("KAFKA_BASE64_KEYS", False, type_cast=str_to_bool)
SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES = get_from_env(
"SESSION_RECORDING_KAFKA_MAX_MESSAGE_BYTES", None, type_cast=int, optional=True
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES: int = get_from_env(
"SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES",
1024 * 1024 * 0.95, # a little less than 1MB to account for overhead
type_cast=int,
)
KAFKA_SECURITY_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", None)