mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 13:39:22 +01:00
feat: remove og lts recording storage (#25945)
This commit is contained in:
parent
4ef61ac47d
commit
520801f45b
@ -1,18 +1,12 @@
|
||||
# EE extended functions for SessionRecording model
|
||||
import gzip
|
||||
import json
|
||||
from datetime import timedelta, datetime
|
||||
from typing import Optional, cast
|
||||
from datetime import timedelta
|
||||
|
||||
import structlog
|
||||
from django.utils import timezone
|
||||
from prometheus_client import Histogram, Counter
|
||||
from sentry_sdk import capture_exception, capture_message
|
||||
|
||||
from posthog import settings
|
||||
from posthog.session_recordings.models.metadata import PersistedRecordingV1
|
||||
from posthog.session_recordings.models.session_recording import SessionRecording
|
||||
from posthog.session_recordings.session_recording_helpers import decompress
|
||||
from posthog.storage import object_storage
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
@ -37,40 +31,14 @@ SNAPSHOT_PERSIST_TOO_YOUNG_COUNTER = Counter(
|
||||
"Count of session recordings that were too young to be persisted",
|
||||
)
|
||||
|
||||
RECORDING_PERSIST_START_COUNTER = Counter(
|
||||
"recording_persist_started",
|
||||
"Count of session recordings that were persisted",
|
||||
)
|
||||
|
||||
MINIMUM_AGE_FOR_RECORDING = timedelta(hours=24)
|
||||
|
||||
|
||||
# TODO rename this...
|
||||
def save_recording_with_new_content(recording: SessionRecording, content: str) -> str:
|
||||
if not settings.OBJECT_STORAGE_ENABLED:
|
||||
return ""
|
||||
|
||||
logger.info(
|
||||
"re-saving recording file into 2023-08-01 LTS storage format",
|
||||
recording_id=recording.session_id,
|
||||
team_id=recording.team_id,
|
||||
)
|
||||
|
||||
target_prefix = recording.build_object_storage_path("2023-08-01")
|
||||
|
||||
start = int(cast(datetime, recording.start_time).timestamp() * 1000)
|
||||
end = int(cast(datetime, recording.end_time).timestamp() * 1000)
|
||||
new_path = f"{target_prefix}/{start}-{end}"
|
||||
|
||||
zipped_content = gzip.compress(content.encode("utf-8"))
|
||||
object_storage.write(
|
||||
new_path,
|
||||
zipped_content,
|
||||
extras={"ContentType": "application/json", "ContentEncoding": "gzip"},
|
||||
)
|
||||
|
||||
recording.storage_version = "2023-08-01"
|
||||
recording.object_storage_path = target_prefix
|
||||
recording.save()
|
||||
|
||||
return new_path
|
||||
|
||||
|
||||
class InvalidRecordingForPersisting(Exception):
|
||||
pass
|
||||
|
||||
@ -78,8 +46,6 @@ class InvalidRecordingForPersisting(Exception):
|
||||
def persist_recording(recording_id: str, team_id: int) -> None:
|
||||
"""Persist a recording to the S3"""
|
||||
|
||||
logger.info("Persisting recording: init", recording_id=recording_id, team_id=team_id)
|
||||
|
||||
if not settings.OBJECT_STORAGE_ENABLED:
|
||||
return
|
||||
|
||||
@ -96,27 +62,18 @@ def persist_recording(recording_id: str, team_id: int) -> None:
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Persisting recording: loading metadata...",
|
||||
recording_id=recording_id,
|
||||
team_id=team_id,
|
||||
)
|
||||
RECORDING_PERSIST_START_COUNTER.inc()
|
||||
|
||||
recording.load_metadata()
|
||||
|
||||
if not recording.start_time or timezone.now() < recording.start_time + MINIMUM_AGE_FOR_RECORDING:
|
||||
# Recording is too recent to be persisted.
|
||||
# We can save the metadata as it is still useful for querying, but we can't move to S3 yet.
|
||||
logger.info(
|
||||
"Persisting recording: skipping as recording start time is less than MINIMUM_AGE_FOR_RECORDING",
|
||||
recording_id=recording_id,
|
||||
team_id=team_id,
|
||||
)
|
||||
SNAPSHOT_PERSIST_TOO_YOUNG_COUNTER.inc()
|
||||
recording.save()
|
||||
return
|
||||
|
||||
target_prefix = recording.build_object_storage_path("2023-08-01")
|
||||
target_prefix = recording.build_blob_lts_storage_path("2023-08-01")
|
||||
source_prefix = recording.build_blob_ingestion_storage_path()
|
||||
# if snapshots are already in blob storage, then we can just copy the files between buckets
|
||||
with SNAPSHOT_PERSIST_TIME_HISTOGRAM.time():
|
||||
@ -127,12 +84,6 @@ def persist_recording(recording_id: str, team_id: int) -> None:
|
||||
recording.object_storage_path = target_prefix
|
||||
recording.save()
|
||||
SNAPSHOT_PERSIST_SUCCESS_COUNTER.inc()
|
||||
logger.info(
|
||||
"Persisting recording: done!",
|
||||
recording_id=recording_id,
|
||||
team_id=team_id,
|
||||
source="s3",
|
||||
)
|
||||
return
|
||||
else:
|
||||
SNAPSHOT_PERSIST_FAILURE_COUNTER.inc()
|
||||
@ -144,53 +95,3 @@ def persist_recording(recording_id: str, team_id: int) -> None:
|
||||
source_prefix=source_prefix,
|
||||
)
|
||||
raise InvalidRecordingForPersisting("Could not persist recording: " + recording_id)
|
||||
|
||||
|
||||
def load_persisted_recording(recording: SessionRecording) -> Optional[PersistedRecordingV1]:
|
||||
"""Load a persisted recording from S3"""
|
||||
|
||||
logger.info(
|
||||
"Persisting recording load: reading from S3...",
|
||||
recording_id=recording.session_id,
|
||||
storage_version=recording.storage_version,
|
||||
path=recording.object_storage_path,
|
||||
)
|
||||
|
||||
# originally storage version was written to the stored content
|
||||
# some stored content is stored over multiple files, so we can't rely on that
|
||||
# future recordings will have the storage version on the model
|
||||
# and will not be loaded here
|
||||
if not recording.storage_version:
|
||||
try:
|
||||
content = object_storage.read(str(recording.object_storage_path))
|
||||
decompressed = json.loads(decompress(content)) if content else None
|
||||
logger.info(
|
||||
"Persisting recording load: loaded!",
|
||||
recording_id=recording.session_id,
|
||||
path=recording.object_storage_path,
|
||||
)
|
||||
|
||||
return decompressed
|
||||
except object_storage.ObjectStorageError as ose:
|
||||
capture_exception(ose)
|
||||
logger.error(
|
||||
"session_recording.object-storage-load-error",
|
||||
recording_id=recording.session_id,
|
||||
path=recording.object_storage_path,
|
||||
version="2022-12-22",
|
||||
exception=ose,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
capture_message(
|
||||
"session_recording.load_persisted_recording.unexpected_recording_storage_version",
|
||||
extras={
|
||||
"recording_id": recording.session_id,
|
||||
"storage_version": recording.storage_version,
|
||||
"path": recording.object_storage_path,
|
||||
},
|
||||
tags={
|
||||
"team_id": recording.team_id,
|
||||
},
|
||||
)
|
||||
return None
|
||||
|
@ -1,7 +1,5 @@
|
||||
import gzip
|
||||
from datetime import timedelta, datetime, UTC
|
||||
from secrets import token_urlsafe
|
||||
from unittest.mock import patch, MagicMock
|
||||
from uuid import uuid4
|
||||
|
||||
from boto3 import resource
|
||||
@ -9,11 +7,8 @@ from botocore.config import Config
|
||||
from freezegun import freeze_time
|
||||
|
||||
from ee.session_recordings.session_recording_extensions import (
|
||||
load_persisted_recording,
|
||||
persist_recording,
|
||||
save_recording_with_new_content,
|
||||
)
|
||||
from posthog.models.signals import mute_selected_signals
|
||||
from posthog.session_recordings.models.session_recording import SessionRecording
|
||||
from posthog.session_recordings.queries.test.session_replay_sql import (
|
||||
produce_replay_summary,
|
||||
@ -24,7 +19,7 @@ from posthog.settings import (
|
||||
OBJECT_STORAGE_SECRET_ACCESS_KEY,
|
||||
OBJECT_STORAGE_BUCKET,
|
||||
)
|
||||
from posthog.storage.object_storage import write, list_objects
|
||||
from posthog.storage.object_storage import write, list_objects, object_storage_client
|
||||
from posthog.test.base import APIBaseTest, ClickhouseTestMixin
|
||||
|
||||
long_url = f"https://app.posthog.com/my-url?token={token_urlsafe(600)}"
|
||||
@ -64,21 +59,19 @@ class TestSessionRecordingExtensions(ClickhouseTestMixin, APIBaseTest):
|
||||
|
||||
assert not recording.object_storage_path
|
||||
|
||||
def test_can_build_different_object_storage_paths(self) -> None:
|
||||
def test_can_build_object_storage_paths(self) -> None:
|
||||
produce_replay_summary(
|
||||
session_id="test_can_build_different_object_storage_paths-s1",
|
||||
team_id=self.team.pk,
|
||||
)
|
||||
|
||||
recording: SessionRecording = SessionRecording.objects.create(
|
||||
team=self.team,
|
||||
session_id="test_can_build_different_object_storage_paths-s1",
|
||||
)
|
||||
|
||||
assert (
|
||||
recording.build_object_storage_path("2022-12-22")
|
||||
== f"session_recordings_lts/team-{self.team.pk}/session-test_can_build_different_object_storage_paths-s1"
|
||||
)
|
||||
assert (
|
||||
recording.build_object_storage_path("2023-08-01")
|
||||
recording.build_blob_lts_storage_path("2023-08-01")
|
||||
== f"session_recordings_lts/team_id/{self.team.pk}/session_id/test_can_build_different_object_storage_paths-s1/data"
|
||||
)
|
||||
|
||||
@ -100,14 +93,21 @@ class TestSessionRecordingExtensions(ClickhouseTestMixin, APIBaseTest):
|
||||
|
||||
# this recording already has several files stored from Mr. Blobby
|
||||
# these need to be written before creating the recording object
|
||||
blob_path = f"{TEST_BUCKET}/team_id/{self.team.pk}/session_id/{session_id}/data"
|
||||
for file in ["a", "b", "c"]:
|
||||
blob_path = f"{TEST_BUCKET}/team_id/{self.team.pk}/session_id/{session_id}/data"
|
||||
file_name = f"{blob_path}/{file}"
|
||||
write(file_name, f"my content-{file}".encode())
|
||||
|
||||
assert object_storage_client().list_objects(OBJECT_STORAGE_BUCKET, blob_path) == [
|
||||
f"{blob_path}/a",
|
||||
f"{blob_path}/b",
|
||||
f"{blob_path}/c",
|
||||
]
|
||||
|
||||
recording: SessionRecording = SessionRecording.objects.create(team=self.team, session_id=session_id)
|
||||
|
||||
assert recording.created_at == two_minutes_ago
|
||||
assert recording.storage_version is None
|
||||
|
||||
persist_recording(recording.session_id, recording.team_id)
|
||||
recording.refresh_from_db()
|
||||
@ -126,47 +126,9 @@ class TestSessionRecordingExtensions(ClickhouseTestMixin, APIBaseTest):
|
||||
assert recording.keypress_count == 0
|
||||
assert recording.start_url == "https://app.posthog.com/my-url"
|
||||
|
||||
# recordings which were blob ingested can not be loaded with this mechanism
|
||||
assert load_persisted_recording(recording) is None
|
||||
|
||||
stored_objects = list_objects(recording.build_object_storage_path("2023-08-01"))
|
||||
stored_objects = list_objects(recording.build_blob_lts_storage_path("2023-08-01"))
|
||||
assert stored_objects == [
|
||||
f"{recording.build_object_storage_path('2023-08-01')}/a",
|
||||
f"{recording.build_object_storage_path('2023-08-01')}/b",
|
||||
f"{recording.build_object_storage_path('2023-08-01')}/c",
|
||||
f"{recording.build_blob_lts_storage_path('2023-08-01')}/a",
|
||||
f"{recording.build_blob_lts_storage_path('2023-08-01')}/b",
|
||||
f"{recording.build_blob_lts_storage_path('2023-08-01')}/c",
|
||||
]
|
||||
|
||||
@patch("ee.session_recordings.session_recording_extensions.object_storage.write")
|
||||
def test_can_save_content_to_new_location(self, mock_write: MagicMock):
|
||||
# mute selected signals so the post create signal does not try to persist the recording
|
||||
with self.settings(OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER=TEST_BUCKET), mute_selected_signals():
|
||||
session_id = f"{uuid4()}"
|
||||
|
||||
recording = SessionRecording.objects.create(
|
||||
team=self.team,
|
||||
session_id=session_id,
|
||||
start_time=datetime.fromtimestamp(12345),
|
||||
end_time=datetime.fromtimestamp(12346),
|
||||
object_storage_path="some_starting_value",
|
||||
# None, but that would trigger the persistence behavior, and we don't want that
|
||||
storage_version="None",
|
||||
)
|
||||
|
||||
new_key = save_recording_with_new_content(recording, "the new content")
|
||||
|
||||
recording.refresh_from_db()
|
||||
|
||||
expected_path = f"session_recordings_lts/team_id/{self.team.pk}/session_id/{recording.session_id}/data"
|
||||
assert new_key == f"{expected_path}/12345000-12346000"
|
||||
|
||||
assert recording.object_storage_path == expected_path
|
||||
assert recording.storage_version == "2023-08-01"
|
||||
|
||||
mock_write.assert_called_with(
|
||||
f"{expected_path}/12345000-12346000",
|
||||
gzip.compress(b"the new content"),
|
||||
extras={
|
||||
"ContentEncoding": "gzip",
|
||||
"ContentType": "application/json",
|
||||
},
|
||||
)
|
||||
|
@ -222,8 +222,22 @@ export const parseEncodedSnapshots = async (
|
||||
return []
|
||||
}
|
||||
try {
|
||||
const snapshotLine = typeof l === 'string' ? (JSON.parse(l) as EncodedRecordingSnapshot) : l
|
||||
const snapshotData = isRecordingSnapshot(snapshotLine) ? [snapshotLine] : snapshotLine['data']
|
||||
let snapshotLine: { windowId: string } | EncodedRecordingSnapshot
|
||||
if (typeof l === 'string') {
|
||||
// is loaded from blob or realtime storage
|
||||
snapshotLine = JSON.parse(l) as EncodedRecordingSnapshot
|
||||
} else {
|
||||
// is loaded from file export
|
||||
snapshotLine = l
|
||||
}
|
||||
let snapshotData: ({ windowId: string } | EncodedRecordingSnapshot)[]
|
||||
if (isRecordingSnapshot(snapshotLine)) {
|
||||
// is loaded from file export
|
||||
snapshotData = [snapshotLine]
|
||||
} else {
|
||||
// is loaded from blob or realtime storage
|
||||
snapshotData = snapshotLine['data']
|
||||
}
|
||||
|
||||
if (!isMobileSnapshots) {
|
||||
isMobileSnapshots = hasAnyWireframes(snapshotData)
|
||||
|
@ -56,9 +56,3 @@ class RecordingMetadata(TypedDict):
|
||||
|
||||
class RecordingMatchingEvents(TypedDict):
|
||||
events: list[MatchingSessionRecordingEvent]
|
||||
|
||||
|
||||
class PersistedRecordingV1(TypedDict):
|
||||
version: str # "2022-12-22"
|
||||
snapshot_data_by_window_id: dict[WindowId, list[Union[SnapshotData, SessionRecordingEventSummary]]]
|
||||
distinct_id: str
|
||||
|
@ -147,23 +147,14 @@ class SessionRecording(UUIDModel):
|
||||
SessionRecordingViewed.objects.get_or_create(team=self.team, user=user, session_id=self.session_id)
|
||||
self.viewed = True
|
||||
|
||||
def build_object_storage_path(self, version: Literal["2023-08-01", "2022-12-22"]) -> str:
|
||||
if version == "2022-12-22":
|
||||
path_parts: list[str] = [
|
||||
settings.OBJECT_STORAGE_SESSION_RECORDING_LTS_FOLDER,
|
||||
f"team-{self.team_id}",
|
||||
f"session-{self.session_id}",
|
||||
]
|
||||
return "/".join(path_parts)
|
||||
elif version == "2023-08-01":
|
||||
return self._build_session_blob_path(settings.OBJECT_STORAGE_SESSION_RECORDING_LTS_FOLDER)
|
||||
def build_blob_lts_storage_path(self, version: Literal["2023-08-01"]) -> str:
|
||||
if version == "2023-08-01":
|
||||
return self.build_blob_ingestion_storage_path(settings.OBJECT_STORAGE_SESSION_RECORDING_LTS_FOLDER)
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown session replay object storage version {version}")
|
||||
|
||||
def build_blob_ingestion_storage_path(self) -> str:
|
||||
return self._build_session_blob_path(settings.OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER)
|
||||
|
||||
def _build_session_blob_path(self, root_prefix: str) -> str:
|
||||
def build_blob_ingestion_storage_path(self, root_prefix: Optional[str] = None) -> str:
|
||||
root_prefix = root_prefix or settings.OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER
|
||||
return f"{root_prefix}/team_id/{self.team_id}/session_id/{self.session_id}/data"
|
||||
|
||||
@staticmethod
|
||||
|
@ -55,9 +55,6 @@ from posthog.session_recordings.realtime_snapshots import (
|
||||
get_realtime_snapshots,
|
||||
publish_subscription,
|
||||
)
|
||||
from posthog.session_recordings.snapshots.convert_legacy_snapshots import (
|
||||
convert_original_version_lts_recording,
|
||||
)
|
||||
from posthog.storage import object_storage
|
||||
|
||||
SNAPSHOTS_BY_PERSONAL_API_KEY_COUNTER = Counter(
|
||||
@ -554,21 +551,9 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet, U
|
||||
blob_prefix = ""
|
||||
|
||||
if recording.object_storage_path:
|
||||
if recording.storage_version == "2023-08-01":
|
||||
blob_prefix = recording.object_storage_path
|
||||
blob_keys = object_storage.list_objects(cast(str, blob_prefix))
|
||||
else:
|
||||
# originally LTS files were in a single file
|
||||
# TODO this branch can be deleted after 01-08-2024
|
||||
sources.append(
|
||||
{
|
||||
"source": "blob",
|
||||
"start_timestamp": recording.start_time,
|
||||
"end_timestamp": recording.end_time,
|
||||
"blob_key": recording.object_storage_path,
|
||||
}
|
||||
)
|
||||
might_have_realtime = False
|
||||
blob_prefix = recording.object_storage_path
|
||||
blob_keys = object_storage.list_objects(cast(str, blob_prefix))
|
||||
might_have_realtime = False
|
||||
else:
|
||||
blob_prefix = recording.build_blob_ingestion_storage_path()
|
||||
blob_keys = object_storage.list_objects(blob_prefix)
|
||||
@ -771,11 +756,12 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet, U
|
||||
if recording.storage_version == "2023-08-01":
|
||||
file_key = f"{recording.object_storage_path}/{blob_key}"
|
||||
else:
|
||||
# this is a legacy recording, we need to load the file from the old path
|
||||
file_key = convert_original_version_lts_recording(recording)
|
||||
raise NotImplementedError(
|
||||
f"Unknown session replay object storage version {recording.storage_version}"
|
||||
)
|
||||
else:
|
||||
blob_prefix = settings.OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER
|
||||
file_key = f"{blob_prefix}/team_id/{self.team.pk}/session_id/{recording.session_id}/data/{blob_key}"
|
||||
file_key = f"{recording.build_blob_ingestion_storage_path(root_prefix=blob_prefix)}/{blob_key}"
|
||||
url = object_storage.get_presigned_url(file_key, expiration=60)
|
||||
if not url:
|
||||
raise exceptions.NotFound("Snapshot file not found")
|
||||
|
@ -1,86 +0,0 @@
|
||||
import json
|
||||
|
||||
import structlog
|
||||
from prometheus_client import Histogram
|
||||
|
||||
from posthog.session_recordings.models.session_recording import SessionRecording
|
||||
from posthog.session_recordings.session_recording_helpers import decompress
|
||||
from posthog.storage import object_storage
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
RECORDING_CONVERSION_TIME_HISTOGRAM = Histogram(
|
||||
"recording_conversion_time_seconds",
|
||||
"We convert legacy recordings from LTS format to the latest format, how long does that take?",
|
||||
)
|
||||
|
||||
|
||||
def _save_converted_content_back_to_storage(converted_content: str, recording: SessionRecording) -> str:
|
||||
try:
|
||||
from ee.session_recordings.session_recording_extensions import (
|
||||
save_recording_with_new_content,
|
||||
)
|
||||
|
||||
return save_recording_with_new_content(recording, converted_content)
|
||||
except ImportError:
|
||||
# not running in EE context... shouldn't get here
|
||||
logger.exception(
|
||||
"attempted_to_save_converted_content_back_to_storage_in_non_ee_context",
|
||||
recording_id=recording.id,
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
def convert_original_version_lts_recording(recording: SessionRecording) -> str:
|
||||
# the original version of the LTS recording was a single file
|
||||
# its contents were gzipped and then base64 encoded.
|
||||
# we can't simply stream it back to the requester
|
||||
|
||||
with RECORDING_CONVERSION_TIME_HISTOGRAM.time():
|
||||
content = object_storage.read(str(recording.object_storage_path))
|
||||
if not content:
|
||||
# TODO and if there's no content is this right?
|
||||
logger.error(
|
||||
"attempted_to_convert_original_version_lts_recording_with_no_content",
|
||||
recording_id=recording.id,
|
||||
object_storage_path=recording.object_storage_path,
|
||||
)
|
||||
return ""
|
||||
|
||||
converted_content = _prepare_legacy_content(content)
|
||||
|
||||
original_path = recording.object_storage_path
|
||||
new_file_key = _save_converted_content_back_to_storage(converted_content, recording)
|
||||
|
||||
# TODO we should delete the old recording from storage here, but might not have permissions
|
||||
object_storage.tag(str(original_path), {"converted": "true"})
|
||||
|
||||
return new_file_key
|
||||
|
||||
|
||||
def _prepare_legacy_content(content: str) -> str:
|
||||
# historically we stored the recording as a single file with a base64 encoded gzipped json string
|
||||
# using utf-16 encoding, this `decompress` method unwinds that back to a json string
|
||||
decoded_content = decompress(content)
|
||||
json_content = json.loads(decoded_content)
|
||||
return _convert_legacy_format_from_lts_storage(json_content)
|
||||
|
||||
|
||||
def _convert_legacy_format_from_lts_storage(lts_formatted_data: dict) -> str:
|
||||
"""
|
||||
The latest version is JSONL formatted data.
|
||||
Each line is json containing a window_id and a data array.
|
||||
This is equivalent to the LTS format snapshot_data_by_window_id property dumped as a single line.
|
||||
"""
|
||||
if "snapshot_data_by_window_id" not in lts_formatted_data:
|
||||
raise ValueError("Invalid LTS format: missing snapshot_data_by_window_id")
|
||||
|
||||
if "version" not in lts_formatted_data or lts_formatted_data["version"] != "2022-12-22":
|
||||
raise ValueError(f"Invalid LTS format: version is {lts_formatted_data.get('version', 'missing')}")
|
||||
|
||||
snapshot_data_by_window_id = lts_formatted_data["snapshot_data_by_window_id"]
|
||||
converted = ""
|
||||
for window_id, data in snapshot_data_by_window_id.items():
|
||||
converted += json.dumps({"window_id": window_id, "data": data}, separators=(",", ":")) + "\n"
|
||||
|
||||
return converted.rstrip("\n")
|
@ -4870,46 +4870,6 @@
|
||||
AND "posthog_persondistinctid"."team_id" = 99999)
|
||||
'''
|
||||
# ---
|
||||
# name: TestSessionRecordings.test_listing_recordings_is_not_nplus1_for_persons.193
|
||||
'''
|
||||
SELECT "posthog_sessionrecordingviewed"."session_id"
|
||||
FROM "posthog_sessionrecordingviewed"
|
||||
WHERE ("posthog_sessionrecordingviewed"."team_id" = 2
|
||||
AND "posthog_sessionrecordingviewed"."user_id" = 2)
|
||||
'''
|
||||
# ---
|
||||
# name: TestSessionRecordings.test_listing_recordings_is_not_nplus1_for_persons.194
|
||||
'''
|
||||
SELECT "posthog_persondistinctid"."id",
|
||||
"posthog_persondistinctid"."team_id",
|
||||
"posthog_persondistinctid"."person_id",
|
||||
"posthog_persondistinctid"."distinct_id",
|
||||
"posthog_persondistinctid"."version",
|
||||
"posthog_person"."id",
|
||||
"posthog_person"."created_at",
|
||||
"posthog_person"."properties_last_updated_at",
|
||||
"posthog_person"."properties_last_operation",
|
||||
"posthog_person"."team_id",
|
||||
"posthog_person"."properties",
|
||||
"posthog_person"."is_user_id",
|
||||
"posthog_person"."is_identified",
|
||||
"posthog_person"."uuid",
|
||||
"posthog_person"."version"
|
||||
FROM "posthog_persondistinctid"
|
||||
INNER JOIN "posthog_person" ON ("posthog_persondistinctid"."person_id" = "posthog_person"."id")
|
||||
WHERE ("posthog_persondistinctid"."distinct_id" IN ('user1',
|
||||
'user10',
|
||||
'user2',
|
||||
'user3',
|
||||
'user4',
|
||||
'user5',
|
||||
'user6',
|
||||
'user7',
|
||||
'user8',
|
||||
'user9')
|
||||
AND "posthog_persondistinctid"."team_id" = 2)
|
||||
'''
|
||||
# ---
|
||||
# name: TestSessionRecordings.test_listing_recordings_is_not_nplus1_for_persons.2
|
||||
'''
|
||||
SELECT "posthog_organizationmembership"."id",
|
||||
|
@ -1,10 +1,8 @@
|
||||
import uuid
|
||||
from unittest.mock import patch, MagicMock, call
|
||||
|
||||
from rest_framework import status
|
||||
|
||||
from posthog.models import Team
|
||||
from posthog.models.signals import mute_selected_signals
|
||||
from posthog.session_recordings.models.session_recording import SessionRecording
|
||||
from posthog.session_recordings.test import setup_stream_from
|
||||
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, QueryMatchingTest
|
||||
@ -77,50 +75,6 @@ class TestSessionRecordings(APIBaseTest, ClickhouseTestMixin, QueryMatchingTest)
|
||||
],
|
||||
}
|
||||
|
||||
@patch(
|
||||
"posthog.session_recordings.queries.session_replay_events.SessionReplayEvents.exists",
|
||||
return_value=True,
|
||||
)
|
||||
@patch("posthog.session_recordings.session_recording_api.object_storage.list_objects")
|
||||
def test_original_version_stored_snapshots_can_be_gathered(
|
||||
self, mock_list_objects: MagicMock, _mock_exists: MagicMock
|
||||
) -> None:
|
||||
session_id = str(uuid.uuid4())
|
||||
lts_storage_path = "1234-5678"
|
||||
|
||||
def list_objects_func(_path: str) -> list[str]:
|
||||
return []
|
||||
|
||||
mock_list_objects.side_effect = list_objects_func
|
||||
|
||||
with mute_selected_signals():
|
||||
SessionRecording.objects.create(
|
||||
team=self.team,
|
||||
session_id=session_id,
|
||||
storage_version=None,
|
||||
object_storage_path=lts_storage_path,
|
||||
start_time="1970-01-01T00:00:00.001000Z",
|
||||
end_time="1970-01-01T00:00:00.002000Z",
|
||||
)
|
||||
|
||||
response = self.client.get(f"/api/projects/{self.team.id}/session_recordings/{session_id}/snapshots?version=2")
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
response_data = response.json()
|
||||
|
||||
assert mock_list_objects.call_args_list == []
|
||||
|
||||
assert response_data == {
|
||||
"sources": [
|
||||
{
|
||||
# original version had a single path that was its blob key
|
||||
"blob_key": lts_storage_path,
|
||||
"source": "blob",
|
||||
"start_timestamp": "1970-01-01T00:00:00.001000Z",
|
||||
"end_timestamp": "1970-01-01T00:00:00.002000Z",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
@patch(
|
||||
"posthog.session_recordings.queries.session_replay_events.SessionReplayEvents.exists",
|
||||
return_value=True,
|
||||
@ -176,74 +130,3 @@ class TestSessionRecordings(APIBaseTest, ClickhouseTestMixin, QueryMatchingTest)
|
||||
]
|
||||
|
||||
assert response_data == "Example content"
|
||||
|
||||
@patch(
|
||||
"posthog.session_recordings.queries.session_replay_events.SessionReplayEvents.exists",
|
||||
return_value=True,
|
||||
)
|
||||
@patch("posthog.session_recordings.session_recording_api.stream_from", return_value=setup_stream_from())
|
||||
@patch("posthog.session_recordings.session_recording_api.object_storage.tag")
|
||||
@patch("posthog.session_recordings.session_recording_api.object_storage.write")
|
||||
@patch("posthog.session_recordings.session_recording_api.object_storage.read")
|
||||
@patch("posthog.session_recordings.session_recording_api.object_storage.get_presigned_url")
|
||||
@patch("posthog.session_recordings.session_recording_api.object_storage.list_objects")
|
||||
def test_original_version_stored_snapshots_can_be_loaded_without_upversion(
|
||||
self,
|
||||
mock_list_objects: MagicMock,
|
||||
mock_get_presigned_url: MagicMock,
|
||||
mock_read: MagicMock,
|
||||
mock_write: MagicMock,
|
||||
mock_tag: MagicMock,
|
||||
mock_requests: MagicMock,
|
||||
_mock_exists: MagicMock,
|
||||
) -> None:
|
||||
session_id = str(uuid.uuid4())
|
||||
lts_storage_path = "1234-5678"
|
||||
|
||||
def list_objects_func(path: str) -> list[str]:
|
||||
return []
|
||||
|
||||
mock_list_objects.side_effect = list_objects_func
|
||||
mock_get_presigned_url.return_value = "https://example.com"
|
||||
mock_read.return_value = legacy_compressed_original
|
||||
|
||||
with mute_selected_signals():
|
||||
SessionRecording.objects.create(
|
||||
team=self.team,
|
||||
session_id=session_id,
|
||||
# to avoid auto-persistence kicking in when this is None
|
||||
storage_version="not a know version",
|
||||
object_storage_path=lts_storage_path,
|
||||
start_time="1970-01-01T00:00:00.001000Z",
|
||||
end_time="1970-01-01T00:00:00.002000Z",
|
||||
)
|
||||
|
||||
query_parameters = [
|
||||
"source=blob",
|
||||
"version=2",
|
||||
f"blob_key={lts_storage_path}",
|
||||
]
|
||||
|
||||
mock_write.reset_mock() # reset the mock to remove setup calls
|
||||
response = self.client.get(
|
||||
f"/api/projects/{self.team.id}/session_recordings/{session_id}/snapshots?{'&'.join(query_parameters)}"
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
|
||||
assert mock_list_objects.call_args_list == []
|
||||
|
||||
expected_path = f"session_recordings_lts/team_id/{self.team.pk}/session_id/{session_id}/data/1-2"
|
||||
|
||||
# the content was saved to the new location
|
||||
assert mock_write.call_args_list[0][0][0] == expected_path
|
||||
# the original location was tagged
|
||||
assert mock_tag.call_args_list == [call(lts_storage_path, {"converted": "true"})]
|
||||
|
||||
# the original saved path isn't loaded for reading the content
|
||||
assert mock_get_presigned_url.call_args_list == [
|
||||
call(expected_path, expiration=60),
|
||||
]
|
||||
|
||||
# and the mock content is returned
|
||||
response_data = response.content.decode("utf-8")
|
||||
assert response_data == "Example content"
|
||||
|
@ -742,12 +742,6 @@ class TestSessionRecordings(APIBaseTest, ClickhouseTestMixin, QueryMatchingTest)
|
||||
"end_timestamp": "2023-01-01T00:00:00Z",
|
||||
"blob_key": "1672531195000-1672531200000",
|
||||
},
|
||||
{
|
||||
"source": "realtime",
|
||||
"start_timestamp": "2022-12-31T23:59:55Z",
|
||||
"end_timestamp": None,
|
||||
"blob_key": None,
|
||||
},
|
||||
]
|
||||
}
|
||||
assert mock_list_objects.call_args_list == [
|
||||
|
Loading…
Reference in New Issue
Block a user