From 067d73cb4f3359a8f01275b77c01e6f6df805019 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 9 May 2023 15:41:16 +0100 Subject: [PATCH] feat: write recording summary events (#15245) Problem see #15200 (comment) When we store session recording events we materialize a lot of information using the snapshot data column. We'll soon not be storing the snapshot data so won't be able to use that to materialize that information, so we need to capture it earlier in the pipeline. Since this is only used for searching for/summarizing recordings we don't need to store every event. Changes We'll push a summary event to a new kafka topic during ingestion. ClickHouse can ingest from that topic into an aggregating merge tree table. So that we store (in theory, although not in practice) only one row per session. add config to turn this on and off by team in plugin server add code behind that to write session recording summary events to a new topic in kafka add ClickHouse tables to ingest and aggregate those summary events --- .github/workflows/ci-backend.yml | 16 +- .github/workflows/pr-deploy.yml | 1 + .run/Plugin Server.run.xml | 3 +- .../player/utils/segmenter.ts | 8 + plugin-server/src/config/config.ts | 4 +- plugin-server/src/config/kafka-topics.ts | 1 + .../scheduled-tasks-consumer.ts | 3 +- .../session-recordings-consumer.ts | 71 +++++++- .../session-recording/snapshot-segmenter.ts | 111 ++++++++++++ plugin-server/src/main/pluginsServer.ts | 1 + plugin-server/src/types.ts | 2 + .../src/worker/ingestion/process-event.ts | 76 ++++++++ .../session-recordings-consumer.test.ts | 80 ++++++++- .../tests/main/process-event.test.ts | 151 ++++++++++++++++ .../migrations/0043_session_replay_events.py | 17 ++ posthog/clickhouse/schema.py | 10 ++ .../test/__snapshots__/test_schema.ambr | 168 ++++++++++++++++++ posthog/kafka_client/topics.py | 1 + posthog/models/session_replay_event/sql.py | 136 ++++++++++++++ .../test/session_replay_sql.py | 119 +++++++++++++ .../test/test_session_replay_summaries.py | 157 ++++++++++++++++ posthog/test/base.py | 38 +++- 22 files changed, 1146 insertions(+), 28 deletions(-) create mode 100644 plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts create mode 100644 posthog/clickhouse/migrations/0043_session_replay_events.py create mode 100644 posthog/models/session_replay_event/sql.py create mode 100644 posthog/queries/session_recordings/test/session_replay_sql.py create mode 100644 posthog/queries/session_recordings/test/test_session_replay_summaries.py diff --git a/.github/workflows/ci-backend.yml b/.github/workflows/ci-backend.yml index 89660936549..3713a3dbb3b 100644 --- a/.github/workflows/ci-backend.yml +++ b/.github/workflows/ci-backend.yml @@ -117,8 +117,7 @@ jobs: if: steps.cache-backend-tests.outputs.cache-hit != 'true' run: | cd current - python -m pip install -r requirements-dev.txt - python -m pip install -r requirements.txt + python -m pip install -r requirements.txt -r requirements-dev.txt - name: Check for syntax errors, import sort, and code style violations run: | @@ -199,8 +198,7 @@ jobs: - name: Install python dependencies if: steps.cache-backend-tests.outputs.cache-hit != 'true' run: | - python -m pip install -r requirements-dev.txt - python -m pip install -r requirements.txt + python -m pip install -r requirements.txt -r requirements-dev.txt - uses: actions/checkout@v3 with: @@ -210,8 +208,7 @@ jobs: run: | # We need to ensure we have requirements for the master branch # now also, so we can run migrations up to master. - python -m pip install -r requirements-dev.txt - python -m pip install -r requirements.txt + python -m pip install -r requirements.txt -r requirements-dev.txt python manage.py migrate - uses: actions/checkout@v3 @@ -356,8 +353,7 @@ jobs: - name: Install python dependencies if: steps.cache-backend-tests.outputs.cache-hit != 'true' run: | - python -m pip install -r master/requirements-dev.txt - python -m pip install -r master/requirements.txt + python -m pip install -r master/requirements.txt -r master/requirements-dev.txt - name: Wait for Clickhouse & Kafka run: master/bin/check_kafka_clickhouse_up @@ -376,9 +372,7 @@ jobs: - name: Install requirements.txt dependencies with pip at current branch run: | cd current - python -m pip install --upgrade pip - python -m pip install -r requirements.txt - python -m pip install freezegun fakeredis pytest pytest-mock pytest-django syrupy + python -m pip install -r requirements.txt -r requirements-dev.txt - name: Link posthog-cloud at current branch run: | diff --git a/.github/workflows/pr-deploy.yml b/.github/workflows/pr-deploy.yml index e11e73db2fa..b61554097df 100644 --- a/.github/workflows/pr-deploy.yml +++ b/.github/workflows/pr-deploy.yml @@ -61,6 +61,7 @@ jobs: export RELEASE_NAME=posthog export NAMESPACE=pr-$PR_NUM-${BRANCH_NAME//\//-} export NAMESPACE=${NAMESPACE:0:38} + export NAMESPACE=${NAMESPACE%%-} export HOSTNAME=$NAMESPACE export TAILNET_NAME=hedgehog-kitefin export TS_AUTHKEY=${{ secrets.TAILSCALE_SERVICE_AUTHKEY }} diff --git a/.run/Plugin Server.run.xml b/.run/Plugin Server.run.xml index cfa9467dac7..7f90cb5f797 100644 --- a/.run/Plugin Server.run.xml +++ b/.run/Plugin Server.run.xml @@ -13,7 +13,8 @@ + - \ No newline at end of file + diff --git a/frontend/src/scenes/session-recordings/player/utils/segmenter.ts b/frontend/src/scenes/session-recordings/player/utils/segmenter.ts index 87a8596670b..8af9840bd9d 100644 --- a/frontend/src/scenes/session-recordings/player/utils/segmenter.ts +++ b/frontend/src/scenes/session-recordings/player/utils/segmenter.ts @@ -2,6 +2,14 @@ import { EventType, IncrementalSource, eventWithTime } from '@rrweb/types' import { Dayjs } from 'lib/dayjs' import { RecordingSegment, RecordingSnapshot } from '~/types' +/** + * This file is copied into the plugin server to calculate activeMilliseconds on ingestion + * plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts + * + * Changes here should be reflected there + * TODO add code sharing between plugin-server and front-end so that this duplication is unnecessary + */ + const activeSources = [ IncrementalSource.MouseMove, IncrementalSource.MouseInteraction, diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index dd2bcbcc502..c6e3cda6cc4 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -113,13 +113,15 @@ export function getDefaultConfig(): PluginsServerConfig { USE_KAFKA_FOR_SCHEDULED_TASKS: true, CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag - SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: BW Change this to 'all' when we release it fully + SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: Change this to 'all' when we release it fully SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions', SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10, // NOTE: 10 minutes SESSION_RECORDING_MAX_BUFFER_SIZE_KB: ['dev', 'test'].includes(process.env.NODE_ENV || 'undefined') ? 1024 // NOTE: ~1MB in dev or test, so that even with gzipped content we still flush pretty frequently : 1024 * 50, // ~50MB after compression in prod SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings', + + SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: '', // TODO: Change this to 'all' when we release it fully } } diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index f9f2e88fd3b..b03ef5f24ca 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -10,6 +10,7 @@ export const KAFKA_PERSON = `${prefix}clickhouse_person${suffix}` export const KAFKA_PERSON_UNIQUE_ID = `${prefix}clickhouse_person_unique_id${suffix}` export const KAFKA_PERSON_DISTINCT_ID = `${prefix}clickhouse_person_distinct_id${suffix}` export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}` +export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}` export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}` export const KAFKA_EVENTS_PLUGIN_INGESTION = `${prefix}events_plugin_ingestion${suffix}` export const KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW = `${prefix}events_plugin_ingestion_overflow${suffix}` diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts index 473f68f023d..a867ca42bcd 100644 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -10,8 +10,7 @@ import { instrumentEachBatch, setupEventHandlers } from './kafka-queue' import { latestOffsetTimestampGauge } from './metrics' // The valid task types that can be scheduled. -// TODO: not sure if there is another place that defines these but it would be -// good to unify. +// TODO: not sure if there is another place that defines these but it would be good to unify. const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const export const startScheduledTasksConsumer = async ({ diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index c8be1315fd3..3864fc9530d 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -1,8 +1,10 @@ import { PluginEvent } from '@posthog/plugin-scaffold' +import { captureException } from '@sentry/node' import { HighLevelProducer as RdKafkaProducer, Message, NumberNullUndefined } from 'node-rdkafka-acosom' import { KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS, + KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, KAFKA_PERFORMANCE_EVENTS, KAFKA_SESSION_RECORDING_EVENTS, KAFKA_SESSION_RECORDING_EVENTS_DLQ, @@ -14,7 +16,12 @@ import { createKafkaProducer, disconnectProducer, flushProducer, produce } from import { PipelineEvent, RawEventMessage, Team } from '../../../types' import { KafkaConfig } from '../../../utils/db/hub' import { status } from '../../../utils/status' -import { createPerformanceEvent, createSessionRecordingEvent } from '../../../worker/ingestion/process-event' +import { + createPerformanceEvent, + createSessionRecordingEvent, + createSessionReplayEvent, + SummarizedSessionRecordingEvent, +} from '../../../worker/ingestion/process-event' import { TeamManager } from '../../../worker/ingestion/team-manager' import { parseEventTimestamp } from '../../../worker/ingestion/timestamps' import { eventDroppedCounter } from '../metrics' @@ -25,12 +32,14 @@ export const startSessionRecordingEventsConsumer = async ({ consumerMaxBytes, consumerMaxBytesPerPartition, consumerMaxWaitMs, + summaryIngestionEnabledTeams, }: { teamManager: TeamManager kafkaConfig: KafkaConfig consumerMaxBytes: number consumerMaxBytesPerPartition: number consumerMaxWaitMs: number + summaryIngestionEnabledTeams: string }) => { /* For Session Recordings we need to prepare the data for ClickHouse. @@ -59,7 +68,13 @@ export const startSessionRecordingEventsConsumer = async ({ const connectionConfig = createRdConnectionConfigFromEnvVars(kafkaConfig) const producer = await createKafkaProducer(connectionConfig) - const eachBatchWithContext = eachBatch({ teamManager, producer }) + + const eachBatchWithContext = eachBatch({ + teamManager, + producer, + summaryEnabledTeams: + summaryIngestionEnabledTeams === 'all' ? null : summaryIngestionEnabledTeams.split(',').map(parseInt), + }) // Create a node-rdkafka consumer that fetches batches of messages, runs // eachBatchWithContext, then commits offsets for the batch. @@ -84,7 +99,15 @@ export const startSessionRecordingEventsConsumer = async ({ } export const eachBatch = - ({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) => + ({ + teamManager, + producer, + summaryEnabledTeams, + }: { + teamManager: TeamManager + producer: RdKafkaProducer + summaryEnabledTeams: number[] | null + }) => async (messages: Message[]) => { // To start with, we simply process each message in turn, // without attempting to perform any concurrency. There is a lot @@ -105,7 +128,7 @@ export const eachBatch = // DependencyUnavailableError error to distinguish between // intermittent and permanent errors. const pendingProduceRequests: Promise[] = [] - const eachMessageWithContext = eachMessage({ teamManager, producer }) + const eachMessageWithContext = eachMessage({ teamManager, producer, summaryEnabledTeams }) for (const message of messages) { const results = await retryOnDependencyUnavailableError(() => eachMessageWithContext(message)) @@ -148,7 +171,15 @@ export const eachBatch = } const eachMessage = - ({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) => + ({ + teamManager, + producer, + summaryEnabledTeams, + }: { + teamManager: TeamManager + producer: RdKafkaProducer + summaryEnabledTeams: number[] | null + }) => async (message: Message) => { // For each message, we: // @@ -252,7 +283,23 @@ const eachMessage = event.properties || {} ) - return [ + let replayRecord: null | SummarizedSessionRecordingEvent = null + try { + if (summaryEnabledTeams === null || summaryEnabledTeams?.includes(team.id)) { + replayRecord = createSessionReplayEvent( + messagePayload.uuid, + team.id, + messagePayload.distinct_id, + event.ip, + event.properties || {} + ) + } + } catch (e) { + status.warn('??', 'session_replay_summarizer_error', { error: e }) + captureException(e) + } + + const producePromises = [ produce({ producer, topic: KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS, @@ -260,6 +307,18 @@ const eachMessage = key: message.key ? Buffer.from(message.key) : null, }), ] + + if (replayRecord) { + producePromises.push( + produce({ + producer, + topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, + value: Buffer.from(JSON.stringify(replayRecord)), + key: message.key ? Buffer.from(message.key) : null, + }) + ) + } + return producePromises } else if (event.event === '$performance_event') { const clickHouseRecord = createPerformanceEvent( messagePayload.uuid, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts b/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts new file mode 100644 index 00000000000..3e2cb6bcadd --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts @@ -0,0 +1,111 @@ +/** + * This file is a cut-down version of the segmenter.ts + * https://github.com/PostHog/posthog/blob/db2deaf650d2eca9addba5e3f304a17a21041f25/frontend/src/scenes/session-recordings/player/utils/segmenter.ts + * + * It has been modified to not need the same dependencies + * Any changes may need to be sync'd between the two + */ + +const activeSources = [1, 2, 3, 4, 5, 6, 7, 12] + +const ACTIVITY_THRESHOLD_MS = 5000 + +export interface RRWebEventSummaryData { + href?: string + source?: number + payload?: Record +} + +export interface RRWebEventSummary { + timestamp: number + type: number + data: RRWebEventSummaryData + windowId: string +} + +interface RecordingSegment { + kind: 'window' | 'buffer' | 'gap' + startTimestamp: number // Epoch time that the segment starts + endTimestamp: number // Epoch time that the segment ends + durationMs: number + windowId?: string + isActive: boolean +} + +const isActiveEvent = (event: RRWebEventSummary): boolean => { + return event.type === 3 && activeSources.includes(event.data?.source || -1) +} + +const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => { + let segments: RecordingSegment[] = [] + let activeSegment!: Partial + let lastActiveEventTimestamp = 0 + + snapshots.forEach((snapshot) => { + const eventIsActive = isActiveEvent(snapshot) + lastActiveEventTimestamp = eventIsActive ? snapshot.timestamp : lastActiveEventTimestamp + + // When do we create a new segment? + // 1. If we don't have one yet + let isNewSegment = !activeSegment + + // 2. If it is currently inactive but a new "active" event comes in + if (eventIsActive && !activeSegment?.isActive) { + isNewSegment = true + } + + // 3. If it is currently active but no new active event has been seen for the activity threshold + if (activeSegment?.isActive && lastActiveEventTimestamp + ACTIVITY_THRESHOLD_MS < snapshot.timestamp) { + isNewSegment = true + } + + // 4. If windowId changes we create a new segment + if (activeSegment?.windowId !== snapshot.windowId) { + isNewSegment = true + } + + if (isNewSegment) { + if (activeSegment) { + segments.push(activeSegment as RecordingSegment) + } + + activeSegment = { + kind: 'window', + startTimestamp: snapshot.timestamp, + windowId: snapshot.windowId, + isActive: eventIsActive, + } + } + + activeSegment.endTimestamp = snapshot.timestamp + }) + + if (activeSegment) { + segments.push(activeSegment as RecordingSegment) + } + + segments = segments.map((segment) => { + // These can all be done in a loop at the end... + segment.durationMs = segment.endTimestamp - segment.startTimestamp + return segment + }) + + return segments +} + +/** + * TODO add code sharing between plugin-server and front-end so that this method can + * call the same createSegments function as the front-end + */ +export const activeMilliseconds = (snapshots: RRWebEventSummary[]): number => { + const segments = createSegments(snapshots) + return segments.reduce((acc, segment) => { + if (segment.isActive) { + // if the segment is active but has no duration we count it as 1ms + // to distinguish it from segments with no activity at all + return acc + Math.max(1, segment.durationMs) + } + + return acc + }, 0) +} diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 41317480afc..59a01c43989 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -405,6 +405,7 @@ export async function startPluginsServer( consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES, consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS, + summaryIngestionEnabledTeams: serverConfig.SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS, }) stopSessionRecordingEventsConsumer = stop joinSessionRecordingEventsConsumer = join diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 510f75b4d54..baf9c479050 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -188,6 +188,8 @@ export interface PluginsServerConfig { SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number SESSION_RECORDING_REMOTE_FOLDER: string + + SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: string } export interface Hub extends PluginsServerConfig { diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index b13f3d54773..6ff9e9f09ff 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -3,6 +3,7 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { DateTime } from 'luxon' +import { activeMilliseconds, RRWebEventSummary } from '../../main/ingestion-queues/session-recording/snapshot-segmenter' import { Element, GroupTypeIndex, @@ -272,6 +273,81 @@ export const createSessionRecordingEvent = ( return data } + +export interface SummarizedSessionRecordingEvent { + uuid: string + first_timestamp: string + last_timestamp: string + team_id: number + distinct_id: string + session_id: string + first_url: string | undefined + click_count: number + keypress_count: number + mouse_activity_count: number + active_milliseconds: number +} + +export const createSessionReplayEvent = ( + uuid: string, + team_id: number, + distinct_id: string, + ip: string | null, + properties: Properties +) => { + const eventsSummaries: RRWebEventSummary[] = properties['$snapshot_data']?.['events_summary'] || [] + + const timestamps = eventsSummaries + .filter((eventSummary: RRWebEventSummary) => { + return !!eventSummary?.timestamp + }) + .map((eventSummary: RRWebEventSummary) => { + return castTimestampOrNow(DateTime.fromMillis(eventSummary.timestamp), TimestampFormat.ClickHouse) + }) + .sort() + + if (eventsSummaries.length === 0 || timestamps.length === 0) { + return null + } + + let clickCount = 0 + let keypressCount = 0 + let mouseActivity = 0 + let url: string | undefined = undefined + eventsSummaries.forEach((eventSummary: RRWebEventSummary) => { + if (eventSummary.type === 3) { + mouseActivity += 1 + if (eventSummary.data?.source === 2) { + clickCount += 1 + } + if (eventSummary.data?.source === 5) { + keypressCount += 1 + } + } + if (!!eventSummary.data?.href?.trim().length && url === undefined) { + url = eventSummary.data.href + } + }) + + const activeTime = activeMilliseconds(eventsSummaries) + + const data: SummarizedSessionRecordingEvent = { + uuid, + team_id: team_id, + distinct_id: distinct_id, + session_id: properties['$session_id'], + first_timestamp: timestamps[0], + last_timestamp: timestamps[timestamps.length - 1], + click_count: clickCount, + keypress_count: keypressCount, + mouse_activity_count: mouseActivity, + first_url: url, + active_milliseconds: activeTime, + } + + return data +} + export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) { const data: Partial = { uuid, diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 6e9fc72093c..5edc4990806 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -18,7 +18,7 @@ describe('session-recordings-consumer', () => { beforeEach(() => { postgres = new Pool({ connectionString: defaultConfig.DATABASE_URL }) teamManager = new TeamManager(postgres, {} as any) - eachBachWithDependencies = eachBatch({ producer, teamManager }) + eachBachWithDependencies = eachBatch({ producer, teamManager, summaryEnabledTeams: [] }) }) afterEach(() => { @@ -63,4 +63,82 @@ describe('session-recordings-consumer', () => { // Should have send to the DLQ. expect(producer.produce).toHaveBeenCalledTimes(1) }) + + test('eachBatch emits to only one topic', async () => { + const organizationId = await createOrganization(postgres) + const teamId = await createTeam(postgres, organizationId) + + await eachBachWithDependencies([ + { + key: 'test', + value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }), + timestamp: 123, + }, + ]) + + expect(producer.produce).toHaveBeenCalledTimes(1) + }) + + test('eachBatch can emit to two topics', async () => { + const organizationId = await createOrganization(postgres) + const teamId = await createTeam(postgres, organizationId) + + const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: null }) + + await eachBachWithDependencies([ + { + key: 'test', + value: JSON.stringify({ + team_id: teamId, + data: JSON.stringify({ + event: '$snapshot', + properties: { $snapshot_data: { events_summary: [{ timestamp: 12345 }] } }, + }), + }), + timestamp: 123, + }, + ]) + + expect(producer.produce).toHaveBeenCalledTimes(2) + }) + + test('eachBatch can emit to two topics for a specific team', async () => { + const organizationId = await createOrganization(postgres) + const teamId = await createTeam(postgres, organizationId) + + const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId] }) + + await eachBachWithDependencies([ + { + key: 'test', + value: JSON.stringify({ + team_id: teamId, + data: JSON.stringify({ + event: '$snapshot', + properties: { $snapshot_data: { events_summary: [{ timestamp: 12345 }] } }, + }), + }), + timestamp: 123, + }, + ]) + + expect(producer.produce).toHaveBeenCalledTimes(2) + }) + + test('eachBatch can emit to only one topic when team is not summary enabled', async () => { + const organizationId = await createOrganization(postgres) + const teamId = await createTeam(postgres, organizationId) + + const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId + 1] }) + + await eachBachWithDependencies([ + { + key: 'test', + value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }), + timestamp: 123, + }, + ]) + + expect(producer.produce).toHaveBeenCalledTimes(1) + }) }) diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 7c970317e8a..9de8037ca77 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -11,6 +11,7 @@ import * as IORedis from 'ioredis' import { DateTime } from 'luxon' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/config/kafka-topics' +import { RRWebEventSummary } from '../../src/main/ingestion-queues/session-recording/snapshot-segmenter' import { ClickHouseEvent, Database, @@ -29,7 +30,9 @@ import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/r import { createPerformanceEvent, createSessionRecordingEvent, + createSessionReplayEvent, EventsProcessor, + SummarizedSessionRecordingEvent, } from '../../src/worker/ingestion/process-event' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse' import { resetKafka } from '../helpers/kafka' @@ -1208,6 +1211,154 @@ test('snapshot event stored as session_recording_event', () => { window_id: undefined, }) }) +const sessionReplayEventTestCases: { + snapshotData: { events_summary: RRWebEventSummary[] } + expected: Pick< + SummarizedSessionRecordingEvent, + | 'click_count' + | 'keypress_count' + | 'mouse_activity_count' + | 'first_url' + | 'first_timestamp' + | 'last_timestamp' + | 'active_milliseconds' + > +}[] = [ + { + snapshotData: { events_summary: [{ timestamp: 1682449093469, type: 3, data: { source: 2 }, windowId: '1' }] }, + expected: { + click_count: 1, + keypress_count: 0, + mouse_activity_count: 1, + first_url: undefined, + first_timestamp: '2023-04-25 18:58:13.469', + last_timestamp: '2023-04-25 18:58:13.469', + active_milliseconds: 1, // one event, but it's active, so active time is 1ms not 0 + }, + }, + { + snapshotData: { events_summary: [{ timestamp: 1682449093469, type: 3, data: { source: 5 }, windowId: '1' }] }, + expected: { + click_count: 0, + keypress_count: 1, + mouse_activity_count: 1, + first_url: undefined, + first_timestamp: '2023-04-25 18:58:13.469', + last_timestamp: '2023-04-25 18:58:13.469', + active_milliseconds: 1, // one event, but it's active, so active time is 1ms not 0 + }, + }, + { + snapshotData: { + events_summary: [ + { + timestamp: 1682449093693, + type: 5, + data: { + payload: { + // doesn't match because href is nested in payload + href: 'http://127.0.0.1:8000/home', + }, + }, + windowId: '1', + }, + { + timestamp: 1682449093469, + type: 4, + data: { + href: 'http://127.0.0.1:8000/second/url', + }, + windowId: '1', + }, + ], + }, + expected: { + click_count: 0, + keypress_count: 0, + mouse_activity_count: 0, + first_url: 'http://127.0.0.1:8000/second/url', + first_timestamp: '2023-04-25 18:58:13.469', + last_timestamp: '2023-04-25 18:58:13.693', + active_milliseconds: 0, // no data.source, so no activity + }, + }, + { + snapshotData: { + events_summary: [ + // three windows with 1 second, 2 seconds, and 3 seconds of activity + // even though they overlap they should be summed separately + { timestamp: 1682449093000, type: 3, data: { source: 2 }, windowId: '1' }, + { timestamp: 1682449094000, type: 3, data: { source: 2 }, windowId: '1' }, + { timestamp: 1682449095000, type: 3, data: { source: 2 }, windowId: '2' }, + { timestamp: 1682449097000, type: 3, data: { source: 2 }, windowId: '2' }, + { timestamp: 1682449096000, type: 3, data: { source: 2 }, windowId: '3' }, + { timestamp: 1682449099000, type: 3, data: { source: 2 }, windowId: '3' }, + ], + }, + expected: { + click_count: 6, + keypress_count: 0, + mouse_activity_count: 6, + first_url: undefined, + first_timestamp: '2023-04-25 18:58:13.000', + last_timestamp: '2023-04-25 18:58:19.000', + active_milliseconds: 6000, // can sum up the activity across windows + }, + }, +] +sessionReplayEventTestCases.forEach(({ snapshotData, expected }) => { + test(`snapshot event ${JSON.stringify(snapshotData)} can be stored as session_replay_event`, () => { + const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', { + $session_id: 'abcf-efg', + $snapshot_data: snapshotData, + } as any as Properties) + + const expectedEvent: SummarizedSessionRecordingEvent = { + distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', + session_id: 'abcf-efg', + team_id: 2, + uuid: 'some-id', + ...expected, + } + expect(data).toEqual(expectedEvent) + }) +}) + +test(`snapshot event with no event summary is ignored`, () => { + const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', { + $session_id: 'abcf-efg', + $snapshot_data: {}, + } as any as Properties) + + expect(data).toEqual(null) +}) + +test(`snapshot event with no event summary timestamps is ignored`, () => { + const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', { + $session_id: 'abcf-efg', + $snapshot_data: { + events_summary: [ + { + type: 5, + data: { + payload: { + // doesn't match because href is nested in payload + href: 'http://127.0.0.1:8000/home', + }, + }, + }, + { + type: 4, + data: { + href: 'http://127.0.0.1:8000/second/url', + }, + }, + ], + }, + } as any as Properties) + + expect(data).toEqual(null) +}) test('performance event stored as performance_event', () => { const data = createPerformanceEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', { diff --git a/posthog/clickhouse/migrations/0043_session_replay_events.py b/posthog/clickhouse/migrations/0043_session_replay_events.py new file mode 100644 index 00000000000..1fde598e3ea --- /dev/null +++ b/posthog/clickhouse/migrations/0043_session_replay_events.py @@ -0,0 +1,17 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.models.session_replay_event.sql import ( + SESSION_REPLAY_EVENTS_TABLE_MV_SQL, + KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL, + SESSION_REPLAY_EVENTS_TABLE_SQL, + DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL, + WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL, +) + + +operations = [ + run_sql_with_exceptions(WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL()), + run_sql_with_exceptions(DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL()), + run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_SQL()), + run_sql_with_exceptions(KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()), + run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_MV_SQL()), +] diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py index 909298b32ef..aa31fb4343a 100644 --- a/posthog/clickhouse/schema.py +++ b/posthog/clickhouse/schema.py @@ -28,6 +28,12 @@ from posthog.models.person_overrides.sql import ( PERSON_OVERRIDES_CREATE_TABLE_SQL, ) from posthog.models.session_recording_event.sql import * +from posthog.models.session_replay_event.sql import ( + KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL, + DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL, + SESSION_REPLAY_EVENTS_TABLE_SQL, + SESSION_REPLAY_EVENTS_TABLE_MV_SQL, +) CREATE_MERGETREE_TABLE_QUERIES = ( CREATE_COHORTPEOPLE_TABLE_SQL, @@ -44,6 +50,7 @@ CREATE_MERGETREE_TABLE_QUERIES = ( INGESTION_WARNINGS_DATA_TABLE_SQL, APP_METRICS_DATA_TABLE_SQL, PERFORMANCE_EVENTS_TABLE_SQL, + SESSION_REPLAY_EVENTS_TABLE_SQL, ) CREATE_DISTRIBUTED_TABLE_QUERIES = ( WRITABLE_EVENTS_TABLE_SQL, @@ -54,6 +61,7 @@ CREATE_DISTRIBUTED_TABLE_QUERIES = ( DISTRIBUTED_APP_METRICS_TABLE_SQL, WRITABLE_PERFORMANCE_EVENTS_TABLE_SQL, DISTRIBUTED_PERFORMANCE_EVENTS_TABLE_SQL, + DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL, ) CREATE_KAFKA_TABLE_QUERIES = ( KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL, @@ -68,6 +76,7 @@ CREATE_KAFKA_TABLE_QUERIES = ( KAFKA_INGESTION_WARNINGS_TABLE_SQL, KAFKA_APP_METRICS_TABLE_SQL, KAFKA_PERFORMANCE_EVENTS_TABLE_SQL, + KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL, ) CREATE_MV_TABLE_QUERIES = ( DEAD_LETTER_QUEUE_TABLE_MV_SQL, @@ -82,6 +91,7 @@ CREATE_MV_TABLE_QUERIES = ( INGESTION_WARNINGS_MV_TABLE_SQL, APP_METRICS_MV_TABLE_SQL, PERFORMANCE_EVENTS_TABLE_MV_SQL, + SESSION_REPLAY_EVENTS_TABLE_MV_SQL, ) CREATE_TABLE_QUERIES = ( diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index 844c73783a8..5011ce5fd2b 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -318,6 +318,25 @@ ' --- +# name: test_create_kafka_table_with_different_kafka_host[kafka_session_replay_events] + ' + + CREATE TABLE IF NOT EXISTS kafka_session_replay_events ON CLUSTER 'posthog' + ( + session_id VARCHAR, + team_id Int64, + distinct_id VARCHAR, + first_timestamp DateTime64(6, 'UTC'), + last_timestamp DateTime64(6, 'UTC'), + first_url Nullable(VARCHAR), + click_count Int64, + keypress_count Int64, + mouse_activity_count Int64, + active_milliseconds Int64 + ) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow') + + ' +--- # name: test_create_table_query[app_metrics] ' @@ -880,6 +899,25 @@ ' --- +# name: test_create_table_query[kafka_session_replay_events] + ' + + CREATE TABLE IF NOT EXISTS kafka_session_replay_events ON CLUSTER 'posthog' + ( + session_id VARCHAR, + team_id Int64, + distinct_id VARCHAR, + first_timestamp DateTime64(6, 'UTC'), + last_timestamp DateTime64(6, 'UTC'), + first_url Nullable(VARCHAR), + click_count Int64, + keypress_count Int64, + mouse_activity_count Int64, + active_milliseconds Int64 + ) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow') + + ' +--- # name: test_create_table_query[performance_events] ' @@ -1273,6 +1311,60 @@ ' --- +# name: test_create_table_query[session_replay_events] + ' + + CREATE TABLE IF NOT EXISTS session_replay_events ON CLUSTER 'posthog' + ( + -- part of order by so will aggregate correctly + session_id VARCHAR, + -- part of order by so will aggregate correctly + team_id Int64, + -- ClickHouse will pick any value of distinct_id for the session + -- this is fine since even if the distinct_id changes during a session + -- it will still (or should still) map to the same person + distinct_id VARCHAR, + min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), + max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), + first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), + click_count SimpleAggregateFunction(sum, Int64), + keypress_count SimpleAggregateFunction(sum, Int64), + mouse_activity_count SimpleAggregateFunction(sum, Int64), + active_milliseconds SimpleAggregateFunction(sum, Int64) + ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events', sipHash64(distinct_id)) + + ' +--- +# name: test_create_table_query[session_replay_events_mv] + ' + + CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_mv ON CLUSTER 'posthog' + TO posthog_test.writable_session_replay_events + AS SELECT + session_id, + team_id, + any(distinct_id) as distinct_id, + min(first_timestamp) AS min_first_timestamp, + max(last_timestamp) AS max_last_timestamp, + -- TRICKY: ClickHouse will pick a relatively random first_url + -- when it collapses the aggregating merge tree + -- unless we teach it what we want... + -- argMin ignores null values + -- so this will get the first non-null value of first_url + -- for each group of session_id and team_id + -- by min of first_timestamp in the batch + -- this is an aggregate function, not a simple aggregate function + -- so we have to write to argMinState, and query with argMinMerge + argMinState(first_url, first_timestamp) as first_url, + sum(click_count) as click_count, + sum(keypress_count) as keypress_count, + sum(mouse_activity_count) as mouse_activity_count, + sum(active_milliseconds) as active_milliseconds + FROM posthog_test.kafka_session_replay_events + group by session_id, team_id + + ' +--- # name: test_create_table_query[sharded_app_metrics] ' @@ -1475,6 +1567,44 @@ ' --- +# name: test_create_table_query[sharded_session_replay_events] + ' + + CREATE TABLE IF NOT EXISTS sharded_session_replay_events ON CLUSTER 'posthog' + ( + -- part of order by so will aggregate correctly + session_id VARCHAR, + -- part of order by so will aggregate correctly + team_id Int64, + -- ClickHouse will pick any value of distinct_id for the session + -- this is fine since even if the distinct_id changes during a session + -- it will still (or should still) map to the same person + distinct_id VARCHAR, + min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), + max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), + first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), + click_count SimpleAggregateFunction(sum, Int64), + keypress_count SimpleAggregateFunction(sum, Int64), + mouse_activity_count SimpleAggregateFunction(sum, Int64), + active_milliseconds SimpleAggregateFunction(sum, Int64) + ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}') + + PARTITION BY toYYYYMM(min_first_timestamp) + -- order by is used by the aggregating merge tree engine to + -- identify candidates to merge, e.g. toDate(min_first_timestamp) + -- would mean we would have one row per day per session_id + -- if CH could completely merge to match the order by + -- it is also used to organise data to make queries faster + -- we want the fewest rows possible but also the fastest queries + -- since we query by date and not by time + -- and order by must be in order of increasing cardinality + -- so we order by date first, then team_id, then session_id + -- hopefully, this is a good balance between the two + ORDER BY (toDate(min_first_timestamp), team_id, session_id) + SETTINGS index_granularity=512 + + ' +--- # name: test_create_table_query[writable_events] ' @@ -2051,3 +2181,41 @@ ' --- +# name: test_create_table_query_replicated_and_storage[sharded_session_replay_events] + ' + + CREATE TABLE IF NOT EXISTS sharded_session_replay_events ON CLUSTER 'posthog' + ( + -- part of order by so will aggregate correctly + session_id VARCHAR, + -- part of order by so will aggregate correctly + team_id Int64, + -- ClickHouse will pick any value of distinct_id for the session + -- this is fine since even if the distinct_id changes during a session + -- it will still (or should still) map to the same person + distinct_id VARCHAR, + min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), + max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), + first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), + click_count SimpleAggregateFunction(sum, Int64), + keypress_count SimpleAggregateFunction(sum, Int64), + mouse_activity_count SimpleAggregateFunction(sum, Int64), + active_milliseconds SimpleAggregateFunction(sum, Int64) + ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}') + + PARTITION BY toYYYYMM(min_first_timestamp) + -- order by is used by the aggregating merge tree engine to + -- identify candidates to merge, e.g. toDate(min_first_timestamp) + -- would mean we would have one row per day per session_id + -- if CH could completely merge to match the order by + -- it is also used to organise data to make queries faster + -- we want the fewest rows possible but also the fastest queries + -- since we query by date and not by time + -- and order by must be in order of increasing cardinality + -- so we order by date first, then team_id, then session_id + -- hopefully, this is a good balance between the two + ORDER BY (toDate(min_first_timestamp), team_id, session_id) + SETTINGS index_granularity=512 + + ' +--- diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py index 1e70be20666..d9bd032305b 100644 --- a/posthog/kafka_client/topics.py +++ b/posthog/kafka_client/topics.py @@ -9,6 +9,7 @@ KAFKA_PERSON = f"{KAFKA_PREFIX}clickhouse_person{SUFFIX}" KAFKA_PERSON_UNIQUE_ID = f"{KAFKA_PREFIX}clickhouse_person_unique_id{SUFFIX}" # DEPRECATED_DO_NOT_USE KAFKA_PERSON_DISTINCT_ID = f"{KAFKA_PREFIX}clickhouse_person_distinct_id{SUFFIX}" KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}" +KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}" KAFKA_PERFORMANCE_EVENTS = f"{KAFKA_PREFIX}clickhouse_performance_events{SUFFIX}" KAFKA_PLUGIN_LOG_ENTRIES = f"{KAFKA_PREFIX}plugin_log_entries{SUFFIX}" KAFKA_DEAD_LETTER_QUEUE = f"{KAFKA_PREFIX}events_dead_letter_queue{SUFFIX}" diff --git a/posthog/models/session_replay_event/sql.py b/posthog/models/session_replay_event/sql.py new file mode 100644 index 00000000000..64c13029e41 --- /dev/null +++ b/posthog/models/session_replay_event/sql.py @@ -0,0 +1,136 @@ +from django.conf import settings + +from posthog.clickhouse.kafka_engine import kafka_engine +from posthog.clickhouse.table_engines import Distributed, ReplicationScheme, AggregatingMergeTree +from posthog.kafka_client.topics import KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS + +SESSION_REPLAY_EVENTS_DATA_TABLE = lambda: "sharded_session_replay_events" + +""" +Kafka needs slightly different column setup. It receives individual events, not aggregates. +We write first_timestamp and last_timestamp as individual records +They will be grouped as min_first_timestamp and max_last_timestamp in the main table +""" +KAFKA_SESSION_REPLAY_EVENTS_TABLE_BASE_SQL = """ +CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}' +( + session_id VARCHAR, + team_id Int64, + distinct_id VARCHAR, + first_timestamp DateTime64(6, 'UTC'), + last_timestamp DateTime64(6, 'UTC'), + first_url Nullable(VARCHAR), + click_count Int64, + keypress_count Int64, + mouse_activity_count Int64, + active_milliseconds Int64 +) ENGINE = {engine} +""" + +SESSION_REPLAY_EVENTS_TABLE_BASE_SQL = """ +CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}' +( + -- part of order by so will aggregate correctly + session_id VARCHAR, + -- part of order by so will aggregate correctly + team_id Int64, + -- ClickHouse will pick any value of distinct_id for the session + -- this is fine since even if the distinct_id changes during a session + -- it will still (or should still) map to the same person + distinct_id VARCHAR, + min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), + max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), + first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), + click_count SimpleAggregateFunction(sum, Int64), + keypress_count SimpleAggregateFunction(sum, Int64), + mouse_activity_count SimpleAggregateFunction(sum, Int64), + active_milliseconds SimpleAggregateFunction(sum, Int64) +) ENGINE = {engine} +""" + + +SESSION_REPLAY_EVENTS_DATA_TABLE_ENGINE = lambda: AggregatingMergeTree( + "session_replay_events", replication_scheme=ReplicationScheme.SHARDED +) + +SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ( + SESSION_REPLAY_EVENTS_TABLE_BASE_SQL + + """ + PARTITION BY toYYYYMM(min_first_timestamp) + -- order by is used by the aggregating merge tree engine to + -- identify candidates to merge, e.g. toDate(min_first_timestamp) + -- would mean we would have one row per day per session_id + -- if CH could completely merge to match the order by + -- it is also used to organise data to make queries faster + -- we want the fewest rows possible but also the fastest queries + -- since we query by date and not by time + -- and order by must be in order of increasing cardinality + -- so we order by date first, then team_id, then session_id + -- hopefully, this is a good balance between the two + ORDER BY (toDate(min_first_timestamp), team_id, session_id) +SETTINGS index_granularity=512 +""" +).format( + table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(), + cluster=settings.CLICKHOUSE_CLUSTER, + engine=SESSION_REPLAY_EVENTS_DATA_TABLE_ENGINE(), +) + +KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: KAFKA_SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format( + table_name="kafka_session_replay_events", + cluster=settings.CLICKHOUSE_CLUSTER, + engine=kafka_engine(topic=KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS), +) + +SESSION_REPLAY_EVENTS_TABLE_MV_SQL = lambda: """ +CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_mv ON CLUSTER '{cluster}' +TO {database}.{target_table} +AS SELECT +session_id, +team_id, +any(distinct_id) as distinct_id, +min(first_timestamp) AS min_first_timestamp, +max(last_timestamp) AS max_last_timestamp, +-- TRICKY: ClickHouse will pick a relatively random first_url +-- when it collapses the aggregating merge tree +-- unless we teach it what we want... +-- argMin ignores null values +-- so this will get the first non-null value of first_url +-- for each group of session_id and team_id +-- by min of first_timestamp in the batch +-- this is an aggregate function, not a simple aggregate function +-- so we have to write to argMinState, and query with argMinMerge +argMinState(first_url, first_timestamp) as first_url, +sum(click_count) as click_count, +sum(keypress_count) as keypress_count, +sum(mouse_activity_count) as mouse_activity_count, +sum(active_milliseconds) as active_milliseconds +FROM {database}.kafka_session_replay_events +group by session_id, team_id +""".format( + target_table="writable_session_replay_events", + cluster=settings.CLICKHOUSE_CLUSTER, + database=settings.CLICKHOUSE_DATABASE, +) + + +# Distributed engine tables are only created if CLICKHOUSE_REPLICATED + +# This table is responsible for writing to sharded_session_replay_events based on a sharding key. +WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format( + table_name="writable_session_replay_events", + cluster=settings.CLICKHOUSE_CLUSTER, + engine=Distributed(data_table=SESSION_REPLAY_EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"), +) + +# This table is responsible for reading from session_replay_events on a cluster setting +DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format( + table_name="session_replay_events", + cluster=settings.CLICKHOUSE_CLUSTER, + engine=Distributed(data_table=SESSION_REPLAY_EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"), +) + + +DROP_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ( + f"DROP TABLE IF EXISTS {SESSION_REPLAY_EVENTS_DATA_TABLE()} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'" +) diff --git a/posthog/queries/session_recordings/test/session_replay_sql.py b/posthog/queries/session_recordings/test/session_replay_sql.py new file mode 100644 index 00000000000..528d26e3665 --- /dev/null +++ b/posthog/queries/session_recordings/test/session_replay_sql.py @@ -0,0 +1,119 @@ +from datetime import datetime +from typing import Optional + +from dateutil.parser import parse +from dateutil.relativedelta import relativedelta + +from posthog.kafka_client.client import ClickhouseProducer +from posthog.kafka_client.topics import KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS +from posthog.models.event.util import format_clickhouse_timestamp +from posthog.utils import cast_timestamp_or_now + +INSERT_SINGLE_SESSION_REPLAY = """ +INSERT INTO sharded_session_replay_events ( + session_id, + team_id, + distinct_id, + min_first_timestamp, + max_last_timestamp, + first_url, + click_count, + keypress_count, + mouse_activity_count, + active_milliseconds +) +SELECT + %(session_id)s, + %(team_id)s, + %(distinct_id)s, + toDateTime64(%(first_timestamp)s, 6, 'UTC'), + toDateTime64(%(last_timestamp)s, 6, 'UTC'), + argMinState(cast(%(first_url)s, 'Nullable(String)'), toDateTime64(%(first_timestamp)s, 6, 'UTC')), + %(click_count)s, + %(keypress_count)s, + %(mouse_activity_count)s, + %(active_milliseconds)s +""" + + +def _sensible_first_timestamp( + first_timestamp: Optional[str | datetime], last_timestamp: Optional[str | datetime] +) -> str: + """ + Normalise the first timestamp to be used in the session replay summary. + If it is not provided but there is a last_timestamp, use an hour before that last_timestamp + Otherwise we use the current time + """ + sensible_timestamp = None + if first_timestamp is not None: + # TRICKY: check it not a string to avoid needing to check if it is a datetime or a Fakedatetime + if not isinstance(first_timestamp, str): + sensible_timestamp = first_timestamp.isoformat() + else: + sensible_timestamp = first_timestamp + else: + if last_timestamp is not None: + if isinstance(last_timestamp, str): + last_timestamp = parse(last_timestamp) + + sensible_timestamp = (last_timestamp - relativedelta(seconds=3600)).isoformat() + + return format_clickhouse_timestamp(cast_timestamp_or_now(sensible_timestamp)) + + +def _sensible_last_timestamp( + first_timestamp: Optional[str | datetime], last_timestamp: Optional[str | datetime] +) -> str: + """ + Normalise the last timestamp to be used in the session replay summary. + If it is not provided but there is a first_timestamp, use an hour after that last_timestamp + Otherwise we use the current time + """ + sensible_timestamp = None + if last_timestamp is not None: + # TRICKY: check it not a string to avoid needing to check if it is a datetime or a Fakedatetime + if not isinstance(last_timestamp, str): + sensible_timestamp = last_timestamp.isoformat() + else: + sensible_timestamp = last_timestamp + else: + if first_timestamp is not None: + if isinstance(first_timestamp, str): + first_timestamp = parse(first_timestamp) + + sensible_timestamp = (first_timestamp - relativedelta(seconds=3600)).isoformat() + + return format_clickhouse_timestamp(cast_timestamp_or_now(sensible_timestamp)) + + +def produce_replay_summary( + team_id: int, + session_id: Optional[str] = None, + distinct_id: Optional[str] = None, + first_timestamp: Optional[str | datetime] = None, + last_timestamp: Optional[str | datetime] = None, + first_url: Optional[str | None] = None, + click_count: Optional[int] = None, + keypress_count: Optional[int] = None, + mouse_activity_count: Optional[int] = None, + active_milliseconds: Optional[float] = None, +): + + first_timestamp = _sensible_first_timestamp(first_timestamp, last_timestamp) + last_timestamp = _sensible_last_timestamp(first_timestamp, last_timestamp) + + data = { + "session_id": session_id or "1", + "team_id": team_id, + "distinct_id": distinct_id or "user", + "first_timestamp": format_clickhouse_timestamp(cast_timestamp_or_now(first_timestamp)), + "last_timestamp": format_clickhouse_timestamp(cast_timestamp_or_now(last_timestamp)), + "first_url": first_url, + "click_count": click_count or 0, + "keypress_count": keypress_count or 0, + "mouse_activity_count": mouse_activity_count or 0, + "active_milliseconds": active_milliseconds or 0, + } + p = ClickhouseProducer() + # because this is in a test it will write directly using SQL not really with Kafka + p.produce(topic=KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, sql=INSERT_SINGLE_SESSION_REPLAY, data=data) diff --git a/posthog/queries/session_recordings/test/test_session_replay_summaries.py b/posthog/queries/session_recordings/test/test_session_replay_summaries.py new file mode 100644 index 00000000000..8c8fad0fa2f --- /dev/null +++ b/posthog/queries/session_recordings/test/test_session_replay_summaries.py @@ -0,0 +1,157 @@ +from datetime import datetime, timedelta +from uuid import uuid4 + +import pytz +from dateutil.parser import isoparse + +from posthog.clickhouse.client import sync_execute +from posthog.models import Team +from posthog.models.event.util import format_clickhouse_timestamp +from posthog.queries.app_metrics.serializers import AppMetricsRequestSerializer +from posthog.queries.session_recordings.test.session_replay_sql import produce_replay_summary +from posthog.test.base import BaseTest, ClickhouseTestMixin, snapshot_clickhouse_queries + + +def make_filter(serializer_klass=AppMetricsRequestSerializer, **kwargs) -> AppMetricsRequestSerializer: + filter = serializer_klass(data=kwargs) + filter.is_valid(raise_exception=True) + return filter + + +class SessionReplaySummaryQuery: + def __init__(self, team: Team, session_id: str, reference_date: str): + self.team = team + self.session_id = session_id + self.reference_date = reference_date + + def list_all(self): + params = { + "team_id": self.team.pk, + "start_time": format_clickhouse_timestamp(isoparse(self.reference_date) - timedelta(hours=48)), + "end_time": format_clickhouse_timestamp(isoparse(self.reference_date) + timedelta(hours=48)), + "session_ids": (self.session_id,), + } + + results = sync_execute( + """ + select + session_id, + any(team_id), + any(distinct_id), + min(min_first_timestamp), + max(max_last_timestamp), + dateDiff('SECOND', min(min_first_timestamp), max(max_last_timestamp)) as duration, + argMinMerge(first_url) as first_url, + sum(click_count), + sum(keypress_count), + sum(mouse_activity_count), + round((sum(active_milliseconds)/1000)/duration, 2) as active_time + from session_replay_events + prewhere team_id = %(team_id)s + and min_first_timestamp >= %(start_time)s + and max_last_timestamp <= %(end_time)s + and session_id in %(session_ids)s + group by session_id + """, + params, + ) + return results + + +class TestReceiveSummarizedSessionReplays(ClickhouseTestMixin, BaseTest): + @snapshot_clickhouse_queries + def test_session_replay_summaries_can_be_queried(self): + session_id = str(uuid4()) + + produce_replay_summary( + session_id=session_id, + team_id=self.team.pk, + # can CH handle a timestamp with no T + first_timestamp="2023-04-27 10:00:00.309", + last_timestamp="2023-04-27 14:20:42.237", + distinct_id=str(self.user.distinct_id), + first_url="https://first-url-ingested.com", + click_count=2, + keypress_count=2, + mouse_activity_count=2, + active_milliseconds=33624 * 1000 * 0.3, # 30% of the total expected duration + ) + + produce_replay_summary( + session_id=session_id, + team_id=self.team.pk, + first_timestamp="2023-04-27T19:17:38.116", + last_timestamp="2023-04-27T19:17:38.117", + distinct_id=str(self.user.distinct_id), + first_url="https://second-url-ingested.com", + click_count=2, + keypress_count=2, + mouse_activity_count=2, + ) + produce_replay_summary( + session_id=session_id, + team_id=self.team.pk, + first_timestamp="2023-04-27T19:18:24.597", + last_timestamp="2023-04-27T19:20:24.597", + distinct_id=str(self.user.distinct_id), + first_url="https://third-url-ingested.com", + click_count=2, + keypress_count=2, + mouse_activity_count=2, + ) + + # same session but ends more than 2 days from start so excluded + produce_replay_summary( + session_id=session_id, + team_id=self.team.pk, + first_timestamp="2023-04-26T19:18:24.597", + last_timestamp="2023-04-29T19:20:24.597", + distinct_id=str(self.user.distinct_id), + first_url=None, + click_count=2, + keypress_count=2, + mouse_activity_count=2, + ) + + # same session but a different team so excluded + produce_replay_summary( + session_id=session_id, + team_id=self.team.pk + 100, + first_timestamp="2023-04-26T19:18:24.597", + last_timestamp="2023-04-28T19:20:24.597", + distinct_id=str(self.user.distinct_id), + first_url=None, + click_count=2, + keypress_count=2, + mouse_activity_count=2, + ) + + # different session so excluded + produce_replay_summary( + session_id=str(uuid4()), + team_id=self.team.pk, + first_timestamp="2023-04-26T19:18:24.597", + last_timestamp="2023-04-26T19:20:24.597", + distinct_id=str(self.user.distinct_id), + first_url=None, + click_count=2, + keypress_count=2, + mouse_activity_count=2, + ) + + results = SessionReplaySummaryQuery(self.team, session_id, "2023-04-26T19:18:24.597").list_all() + assert results == [ + ( + session_id, + self.team.pk, + str(self.user.distinct_id), + datetime(2023, 4, 27, 10, 0, 0, 309000, tzinfo=pytz.UTC), + datetime(2023, 4, 27, 19, 20, 24, 597000, tzinfo=pytz.UTC), + 33624, + "https://first-url-ingested.com", + 6, + 6, + 6, + 0.3, + ) + ] diff --git a/posthog/test/base.py b/posthog/test/base.py index f13d170d0cf..549488c0e5e 100644 --- a/posthog/test/base.py +++ b/posthog/test/base.py @@ -42,6 +42,11 @@ from posthog.models.session_recording_event.sql import ( DROP_SESSION_RECORDING_EVENTS_TABLE_SQL, SESSION_RECORDING_EVENTS_TABLE_SQL, ) +from posthog.models.session_replay_event.sql import ( + SESSION_REPLAY_EVENTS_TABLE_SQL, + DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL, + DROP_SESSION_REPLAY_EVENTS_TABLE_SQL, +) from posthog.settings.utils import get_from_env, str_to_bool persons_cache_tests: List[Dict[str, Any]] = [] @@ -660,6 +665,7 @@ class ClickhouseDestroyTablesMixin(BaseTest): TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL, TRUNCATE_PERSON_DISTINCT_ID2_TABLE_SQL, DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(), + DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(), TRUNCATE_GROUPS_TABLE_SQL, TRUNCATE_COHORTPEOPLE_TABLE_SQL, TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL, @@ -667,10 +673,19 @@ class ClickhouseDestroyTablesMixin(BaseTest): ] ) run_clickhouse_statement_in_parallel( - [EVENTS_TABLE_SQL(), PERSONS_TABLE_SQL(), SESSION_RECORDING_EVENTS_TABLE_SQL()] + [ + EVENTS_TABLE_SQL(), + PERSONS_TABLE_SQL(), + SESSION_RECORDING_EVENTS_TABLE_SQL(), + SESSION_REPLAY_EVENTS_TABLE_SQL(), + ] ) run_clickhouse_statement_in_parallel( - [DISTRIBUTED_EVENTS_TABLE_SQL(), DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL()] + [ + DISTRIBUTED_EVENTS_TABLE_SQL(), + DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(), + DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(), + ] ) def tearDown(self): @@ -682,13 +697,24 @@ class ClickhouseDestroyTablesMixin(BaseTest): DROP_PERSON_TABLE_SQL, TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL, DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(), + DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(), + ] + ) + + run_clickhouse_statement_in_parallel( + [ + EVENTS_TABLE_SQL(), + PERSONS_TABLE_SQL(), + SESSION_RECORDING_EVENTS_TABLE_SQL(), + SESSION_REPLAY_EVENTS_TABLE_SQL(), ] ) run_clickhouse_statement_in_parallel( - [EVENTS_TABLE_SQL(), PERSONS_TABLE_SQL(), SESSION_RECORDING_EVENTS_TABLE_SQL()] - ) - run_clickhouse_statement_in_parallel( - [DISTRIBUTED_EVENTS_TABLE_SQL(), DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL()] + [ + DISTRIBUTED_EVENTS_TABLE_SQL(), + DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(), + DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(), + ] )