diff --git a/.github/workflows/ci-backend.yml b/.github/workflows/ci-backend.yml
index 89660936549..3713a3dbb3b 100644
--- a/.github/workflows/ci-backend.yml
+++ b/.github/workflows/ci-backend.yml
@@ -117,8 +117,7 @@ jobs:
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
cd current
- python -m pip install -r requirements-dev.txt
- python -m pip install -r requirements.txt
+ python -m pip install -r requirements.txt -r requirements-dev.txt
- name: Check for syntax errors, import sort, and code style violations
run: |
@@ -199,8 +198,7 @@ jobs:
- name: Install python dependencies
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
- python -m pip install -r requirements-dev.txt
- python -m pip install -r requirements.txt
+ python -m pip install -r requirements.txt -r requirements-dev.txt
- uses: actions/checkout@v3
with:
@@ -210,8 +208,7 @@ jobs:
run: |
# We need to ensure we have requirements for the master branch
# now also, so we can run migrations up to master.
- python -m pip install -r requirements-dev.txt
- python -m pip install -r requirements.txt
+ python -m pip install -r requirements.txt -r requirements-dev.txt
python manage.py migrate
- uses: actions/checkout@v3
@@ -356,8 +353,7 @@ jobs:
- name: Install python dependencies
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
- python -m pip install -r master/requirements-dev.txt
- python -m pip install -r master/requirements.txt
+ python -m pip install -r master/requirements.txt -r master/requirements-dev.txt
- name: Wait for Clickhouse & Kafka
run: master/bin/check_kafka_clickhouse_up
@@ -376,9 +372,7 @@ jobs:
- name: Install requirements.txt dependencies with pip at current branch
run: |
cd current
- python -m pip install --upgrade pip
- python -m pip install -r requirements.txt
- python -m pip install freezegun fakeredis pytest pytest-mock pytest-django syrupy
+ python -m pip install -r requirements.txt -r requirements-dev.txt
- name: Link posthog-cloud at current branch
run: |
diff --git a/.github/workflows/pr-deploy.yml b/.github/workflows/pr-deploy.yml
index e11e73db2fa..b61554097df 100644
--- a/.github/workflows/pr-deploy.yml
+++ b/.github/workflows/pr-deploy.yml
@@ -61,6 +61,7 @@ jobs:
export RELEASE_NAME=posthog
export NAMESPACE=pr-$PR_NUM-${BRANCH_NAME//\//-}
export NAMESPACE=${NAMESPACE:0:38}
+ export NAMESPACE=${NAMESPACE%%-}
export HOSTNAME=$NAMESPACE
export TAILNET_NAME=hedgehog-kitefin
export TS_AUTHKEY=${{ secrets.TAILSCALE_SERVICE_AUTHKEY }}
diff --git a/.run/Plugin Server.run.xml b/.run/Plugin Server.run.xml
index cfa9467dac7..7f90cb5f797 100644
--- a/.run/Plugin Server.run.xml
+++ b/.run/Plugin Server.run.xml
@@ -13,7 +13,8 @@
+
-
\ No newline at end of file
+
diff --git a/frontend/src/scenes/session-recordings/player/utils/segmenter.ts b/frontend/src/scenes/session-recordings/player/utils/segmenter.ts
index 87a8596670b..8af9840bd9d 100644
--- a/frontend/src/scenes/session-recordings/player/utils/segmenter.ts
+++ b/frontend/src/scenes/session-recordings/player/utils/segmenter.ts
@@ -2,6 +2,14 @@ import { EventType, IncrementalSource, eventWithTime } from '@rrweb/types'
import { Dayjs } from 'lib/dayjs'
import { RecordingSegment, RecordingSnapshot } from '~/types'
+/**
+ * This file is copied into the plugin server to calculate activeMilliseconds on ingestion
+ * plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts
+ *
+ * Changes here should be reflected there
+ * TODO add code sharing between plugin-server and front-end so that this duplication is unnecessary
+ */
+
const activeSources = [
IncrementalSource.MouseMove,
IncrementalSource.MouseInteraction,
diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts
index dd2bcbcc502..c6e3cda6cc4 100644
--- a/plugin-server/src/config/config.ts
+++ b/plugin-server/src/config/config.ts
@@ -113,13 +113,15 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
- SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: BW Change this to 'all' when we release it fully
+ SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: Change this to 'all' when we release it fully
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10, // NOTE: 10 minutes
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: ['dev', 'test'].includes(process.env.NODE_ENV || 'undefined')
? 1024 // NOTE: ~1MB in dev or test, so that even with gzipped content we still flush pretty frequently
: 1024 * 50, // ~50MB after compression in prod
SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings',
+
+ SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: '', // TODO: Change this to 'all' when we release it fully
}
}
diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts
index f9f2e88fd3b..b03ef5f24ca 100644
--- a/plugin-server/src/config/kafka-topics.ts
+++ b/plugin-server/src/config/kafka-topics.ts
@@ -10,6 +10,7 @@ export const KAFKA_PERSON = `${prefix}clickhouse_person${suffix}`
export const KAFKA_PERSON_UNIQUE_ID = `${prefix}clickhouse_person_unique_id${suffix}`
export const KAFKA_PERSON_DISTINCT_ID = `${prefix}clickhouse_person_distinct_id${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
+export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION = `${prefix}events_plugin_ingestion${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW = `${prefix}events_plugin_ingestion_overflow${suffix}`
diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts
index 473f68f023d..a867ca42bcd 100644
--- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts
+++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts
@@ -10,8 +10,7 @@ import { instrumentEachBatch, setupEventHandlers } from './kafka-queue'
import { latestOffsetTimestampGauge } from './metrics'
// The valid task types that can be scheduled.
-// TODO: not sure if there is another place that defines these but it would be
-// good to unify.
+// TODO: not sure if there is another place that defines these but it would be good to unify.
const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const
export const startScheduledTasksConsumer = async ({
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
index c8be1315fd3..3864fc9530d 100644
--- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
+++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
@@ -1,8 +1,10 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
+import { captureException } from '@sentry/node'
import { HighLevelProducer as RdKafkaProducer, Message, NumberNullUndefined } from 'node-rdkafka-acosom'
import {
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
+ KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
KAFKA_PERFORMANCE_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
@@ -14,7 +16,12 @@ import { createKafkaProducer, disconnectProducer, flushProducer, produce } from
import { PipelineEvent, RawEventMessage, Team } from '../../../types'
import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
-import { createPerformanceEvent, createSessionRecordingEvent } from '../../../worker/ingestion/process-event'
+import {
+ createPerformanceEvent,
+ createSessionRecordingEvent,
+ createSessionReplayEvent,
+ SummarizedSessionRecordingEvent,
+} from '../../../worker/ingestion/process-event'
import { TeamManager } from '../../../worker/ingestion/team-manager'
import { parseEventTimestamp } from '../../../worker/ingestion/timestamps'
import { eventDroppedCounter } from '../metrics'
@@ -25,12 +32,14 @@ export const startSessionRecordingEventsConsumer = async ({
consumerMaxBytes,
consumerMaxBytesPerPartition,
consumerMaxWaitMs,
+ summaryIngestionEnabledTeams,
}: {
teamManager: TeamManager
kafkaConfig: KafkaConfig
consumerMaxBytes: number
consumerMaxBytesPerPartition: number
consumerMaxWaitMs: number
+ summaryIngestionEnabledTeams: string
}) => {
/*
For Session Recordings we need to prepare the data for ClickHouse.
@@ -59,7 +68,13 @@ export const startSessionRecordingEventsConsumer = async ({
const connectionConfig = createRdConnectionConfigFromEnvVars(kafkaConfig)
const producer = await createKafkaProducer(connectionConfig)
- const eachBatchWithContext = eachBatch({ teamManager, producer })
+
+ const eachBatchWithContext = eachBatch({
+ teamManager,
+ producer,
+ summaryEnabledTeams:
+ summaryIngestionEnabledTeams === 'all' ? null : summaryIngestionEnabledTeams.split(',').map(parseInt),
+ })
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
@@ -84,7 +99,15 @@ export const startSessionRecordingEventsConsumer = async ({
}
export const eachBatch =
- ({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
+ ({
+ teamManager,
+ producer,
+ summaryEnabledTeams,
+ }: {
+ teamManager: TeamManager
+ producer: RdKafkaProducer
+ summaryEnabledTeams: number[] | null
+ }) =>
async (messages: Message[]) => {
// To start with, we simply process each message in turn,
// without attempting to perform any concurrency. There is a lot
@@ -105,7 +128,7 @@ export const eachBatch =
// DependencyUnavailableError error to distinguish between
// intermittent and permanent errors.
const pendingProduceRequests: Promise[] = []
- const eachMessageWithContext = eachMessage({ teamManager, producer })
+ const eachMessageWithContext = eachMessage({ teamManager, producer, summaryEnabledTeams })
for (const message of messages) {
const results = await retryOnDependencyUnavailableError(() => eachMessageWithContext(message))
@@ -148,7 +171,15 @@ export const eachBatch =
}
const eachMessage =
- ({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
+ ({
+ teamManager,
+ producer,
+ summaryEnabledTeams,
+ }: {
+ teamManager: TeamManager
+ producer: RdKafkaProducer
+ summaryEnabledTeams: number[] | null
+ }) =>
async (message: Message) => {
// For each message, we:
//
@@ -252,7 +283,23 @@ const eachMessage =
event.properties || {}
)
- return [
+ let replayRecord: null | SummarizedSessionRecordingEvent = null
+ try {
+ if (summaryEnabledTeams === null || summaryEnabledTeams?.includes(team.id)) {
+ replayRecord = createSessionReplayEvent(
+ messagePayload.uuid,
+ team.id,
+ messagePayload.distinct_id,
+ event.ip,
+ event.properties || {}
+ )
+ }
+ } catch (e) {
+ status.warn('??', 'session_replay_summarizer_error', { error: e })
+ captureException(e)
+ }
+
+ const producePromises = [
produce({
producer,
topic: KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
@@ -260,6 +307,18 @@ const eachMessage =
key: message.key ? Buffer.from(message.key) : null,
}),
]
+
+ if (replayRecord) {
+ producePromises.push(
+ produce({
+ producer,
+ topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
+ value: Buffer.from(JSON.stringify(replayRecord)),
+ key: message.key ? Buffer.from(message.key) : null,
+ })
+ )
+ }
+ return producePromises
} else if (event.event === '$performance_event') {
const clickHouseRecord = createPerformanceEvent(
messagePayload.uuid,
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts b/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts
new file mode 100644
index 00000000000..3e2cb6bcadd
--- /dev/null
+++ b/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts
@@ -0,0 +1,111 @@
+/**
+ * This file is a cut-down version of the segmenter.ts
+ * https://github.com/PostHog/posthog/blob/db2deaf650d2eca9addba5e3f304a17a21041f25/frontend/src/scenes/session-recordings/player/utils/segmenter.ts
+ *
+ * It has been modified to not need the same dependencies
+ * Any changes may need to be sync'd between the two
+ */
+
+const activeSources = [1, 2, 3, 4, 5, 6, 7, 12]
+
+const ACTIVITY_THRESHOLD_MS = 5000
+
+export interface RRWebEventSummaryData {
+ href?: string
+ source?: number
+ payload?: Record
+}
+
+export interface RRWebEventSummary {
+ timestamp: number
+ type: number
+ data: RRWebEventSummaryData
+ windowId: string
+}
+
+interface RecordingSegment {
+ kind: 'window' | 'buffer' | 'gap'
+ startTimestamp: number // Epoch time that the segment starts
+ endTimestamp: number // Epoch time that the segment ends
+ durationMs: number
+ windowId?: string
+ isActive: boolean
+}
+
+const isActiveEvent = (event: RRWebEventSummary): boolean => {
+ return event.type === 3 && activeSources.includes(event.data?.source || -1)
+}
+
+const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => {
+ let segments: RecordingSegment[] = []
+ let activeSegment!: Partial
+ let lastActiveEventTimestamp = 0
+
+ snapshots.forEach((snapshot) => {
+ const eventIsActive = isActiveEvent(snapshot)
+ lastActiveEventTimestamp = eventIsActive ? snapshot.timestamp : lastActiveEventTimestamp
+
+ // When do we create a new segment?
+ // 1. If we don't have one yet
+ let isNewSegment = !activeSegment
+
+ // 2. If it is currently inactive but a new "active" event comes in
+ if (eventIsActive && !activeSegment?.isActive) {
+ isNewSegment = true
+ }
+
+ // 3. If it is currently active but no new active event has been seen for the activity threshold
+ if (activeSegment?.isActive && lastActiveEventTimestamp + ACTIVITY_THRESHOLD_MS < snapshot.timestamp) {
+ isNewSegment = true
+ }
+
+ // 4. If windowId changes we create a new segment
+ if (activeSegment?.windowId !== snapshot.windowId) {
+ isNewSegment = true
+ }
+
+ if (isNewSegment) {
+ if (activeSegment) {
+ segments.push(activeSegment as RecordingSegment)
+ }
+
+ activeSegment = {
+ kind: 'window',
+ startTimestamp: snapshot.timestamp,
+ windowId: snapshot.windowId,
+ isActive: eventIsActive,
+ }
+ }
+
+ activeSegment.endTimestamp = snapshot.timestamp
+ })
+
+ if (activeSegment) {
+ segments.push(activeSegment as RecordingSegment)
+ }
+
+ segments = segments.map((segment) => {
+ // These can all be done in a loop at the end...
+ segment.durationMs = segment.endTimestamp - segment.startTimestamp
+ return segment
+ })
+
+ return segments
+}
+
+/**
+ * TODO add code sharing between plugin-server and front-end so that this method can
+ * call the same createSegments function as the front-end
+ */
+export const activeMilliseconds = (snapshots: RRWebEventSummary[]): number => {
+ const segments = createSegments(snapshots)
+ return segments.reduce((acc, segment) => {
+ if (segment.isActive) {
+ // if the segment is active but has no duration we count it as 1ms
+ // to distinguish it from segments with no activity at all
+ return acc + Math.max(1, segment.durationMs)
+ }
+
+ return acc
+ }, 0)
+}
diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts
index 41317480afc..59a01c43989 100644
--- a/plugin-server/src/main/pluginsServer.ts
+++ b/plugin-server/src/main/pluginsServer.ts
@@ -405,6 +405,7 @@ export async function startPluginsServer(
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
+ summaryIngestionEnabledTeams: serverConfig.SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS,
})
stopSessionRecordingEventsConsumer = stop
joinSessionRecordingEventsConsumer = join
diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts
index 510f75b4d54..baf9c479050 100644
--- a/plugin-server/src/types.ts
+++ b/plugin-server/src/types.ts
@@ -188,6 +188,8 @@ export interface PluginsServerConfig {
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number
SESSION_RECORDING_REMOTE_FOLDER: string
+
+ SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: string
}
export interface Hub extends PluginsServerConfig {
diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts
index b13f3d54773..6ff9e9f09ff 100644
--- a/plugin-server/src/worker/ingestion/process-event.ts
+++ b/plugin-server/src/worker/ingestion/process-event.ts
@@ -3,6 +3,7 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { DateTime } from 'luxon'
+import { activeMilliseconds, RRWebEventSummary } from '../../main/ingestion-queues/session-recording/snapshot-segmenter'
import {
Element,
GroupTypeIndex,
@@ -272,6 +273,81 @@ export const createSessionRecordingEvent = (
return data
}
+
+export interface SummarizedSessionRecordingEvent {
+ uuid: string
+ first_timestamp: string
+ last_timestamp: string
+ team_id: number
+ distinct_id: string
+ session_id: string
+ first_url: string | undefined
+ click_count: number
+ keypress_count: number
+ mouse_activity_count: number
+ active_milliseconds: number
+}
+
+export const createSessionReplayEvent = (
+ uuid: string,
+ team_id: number,
+ distinct_id: string,
+ ip: string | null,
+ properties: Properties
+) => {
+ const eventsSummaries: RRWebEventSummary[] = properties['$snapshot_data']?.['events_summary'] || []
+
+ const timestamps = eventsSummaries
+ .filter((eventSummary: RRWebEventSummary) => {
+ return !!eventSummary?.timestamp
+ })
+ .map((eventSummary: RRWebEventSummary) => {
+ return castTimestampOrNow(DateTime.fromMillis(eventSummary.timestamp), TimestampFormat.ClickHouse)
+ })
+ .sort()
+
+ if (eventsSummaries.length === 0 || timestamps.length === 0) {
+ return null
+ }
+
+ let clickCount = 0
+ let keypressCount = 0
+ let mouseActivity = 0
+ let url: string | undefined = undefined
+ eventsSummaries.forEach((eventSummary: RRWebEventSummary) => {
+ if (eventSummary.type === 3) {
+ mouseActivity += 1
+ if (eventSummary.data?.source === 2) {
+ clickCount += 1
+ }
+ if (eventSummary.data?.source === 5) {
+ keypressCount += 1
+ }
+ }
+ if (!!eventSummary.data?.href?.trim().length && url === undefined) {
+ url = eventSummary.data.href
+ }
+ })
+
+ const activeTime = activeMilliseconds(eventsSummaries)
+
+ const data: SummarizedSessionRecordingEvent = {
+ uuid,
+ team_id: team_id,
+ distinct_id: distinct_id,
+ session_id: properties['$session_id'],
+ first_timestamp: timestamps[0],
+ last_timestamp: timestamps[timestamps.length - 1],
+ click_count: clickCount,
+ keypress_count: keypressCount,
+ mouse_activity_count: mouseActivity,
+ first_url: url,
+ active_milliseconds: activeTime,
+ }
+
+ return data
+}
+
export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) {
const data: Partial = {
uuid,
diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts
index 6e9fc72093c..5edc4990806 100644
--- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts
+++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts
@@ -18,7 +18,7 @@ describe('session-recordings-consumer', () => {
beforeEach(() => {
postgres = new Pool({ connectionString: defaultConfig.DATABASE_URL })
teamManager = new TeamManager(postgres, {} as any)
- eachBachWithDependencies = eachBatch({ producer, teamManager })
+ eachBachWithDependencies = eachBatch({ producer, teamManager, summaryEnabledTeams: [] })
})
afterEach(() => {
@@ -63,4 +63,82 @@ describe('session-recordings-consumer', () => {
// Should have send to the DLQ.
expect(producer.produce).toHaveBeenCalledTimes(1)
})
+
+ test('eachBatch emits to only one topic', async () => {
+ const organizationId = await createOrganization(postgres)
+ const teamId = await createTeam(postgres, organizationId)
+
+ await eachBachWithDependencies([
+ {
+ key: 'test',
+ value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
+ timestamp: 123,
+ },
+ ])
+
+ expect(producer.produce).toHaveBeenCalledTimes(1)
+ })
+
+ test('eachBatch can emit to two topics', async () => {
+ const organizationId = await createOrganization(postgres)
+ const teamId = await createTeam(postgres, organizationId)
+
+ const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: null })
+
+ await eachBachWithDependencies([
+ {
+ key: 'test',
+ value: JSON.stringify({
+ team_id: teamId,
+ data: JSON.stringify({
+ event: '$snapshot',
+ properties: { $snapshot_data: { events_summary: [{ timestamp: 12345 }] } },
+ }),
+ }),
+ timestamp: 123,
+ },
+ ])
+
+ expect(producer.produce).toHaveBeenCalledTimes(2)
+ })
+
+ test('eachBatch can emit to two topics for a specific team', async () => {
+ const organizationId = await createOrganization(postgres)
+ const teamId = await createTeam(postgres, organizationId)
+
+ const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId] })
+
+ await eachBachWithDependencies([
+ {
+ key: 'test',
+ value: JSON.stringify({
+ team_id: teamId,
+ data: JSON.stringify({
+ event: '$snapshot',
+ properties: { $snapshot_data: { events_summary: [{ timestamp: 12345 }] } },
+ }),
+ }),
+ timestamp: 123,
+ },
+ ])
+
+ expect(producer.produce).toHaveBeenCalledTimes(2)
+ })
+
+ test('eachBatch can emit to only one topic when team is not summary enabled', async () => {
+ const organizationId = await createOrganization(postgres)
+ const teamId = await createTeam(postgres, organizationId)
+
+ const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId + 1] })
+
+ await eachBachWithDependencies([
+ {
+ key: 'test',
+ value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
+ timestamp: 123,
+ },
+ ])
+
+ expect(producer.produce).toHaveBeenCalledTimes(1)
+ })
})
diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts
index 7c970317e8a..9de8037ca77 100644
--- a/plugin-server/tests/main/process-event.test.ts
+++ b/plugin-server/tests/main/process-event.test.ts
@@ -11,6 +11,7 @@ import * as IORedis from 'ioredis'
import { DateTime } from 'luxon'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/config/kafka-topics'
+import { RRWebEventSummary } from '../../src/main/ingestion-queues/session-recording/snapshot-segmenter'
import {
ClickHouseEvent,
Database,
@@ -29,7 +30,9 @@ import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/r
import {
createPerformanceEvent,
createSessionRecordingEvent,
+ createSessionReplayEvent,
EventsProcessor,
+ SummarizedSessionRecordingEvent,
} from '../../src/worker/ingestion/process-event'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse'
import { resetKafka } from '../helpers/kafka'
@@ -1208,6 +1211,154 @@ test('snapshot event stored as session_recording_event', () => {
window_id: undefined,
})
})
+const sessionReplayEventTestCases: {
+ snapshotData: { events_summary: RRWebEventSummary[] }
+ expected: Pick<
+ SummarizedSessionRecordingEvent,
+ | 'click_count'
+ | 'keypress_count'
+ | 'mouse_activity_count'
+ | 'first_url'
+ | 'first_timestamp'
+ | 'last_timestamp'
+ | 'active_milliseconds'
+ >
+}[] = [
+ {
+ snapshotData: { events_summary: [{ timestamp: 1682449093469, type: 3, data: { source: 2 }, windowId: '1' }] },
+ expected: {
+ click_count: 1,
+ keypress_count: 0,
+ mouse_activity_count: 1,
+ first_url: undefined,
+ first_timestamp: '2023-04-25 18:58:13.469',
+ last_timestamp: '2023-04-25 18:58:13.469',
+ active_milliseconds: 1, // one event, but it's active, so active time is 1ms not 0
+ },
+ },
+ {
+ snapshotData: { events_summary: [{ timestamp: 1682449093469, type: 3, data: { source: 5 }, windowId: '1' }] },
+ expected: {
+ click_count: 0,
+ keypress_count: 1,
+ mouse_activity_count: 1,
+ first_url: undefined,
+ first_timestamp: '2023-04-25 18:58:13.469',
+ last_timestamp: '2023-04-25 18:58:13.469',
+ active_milliseconds: 1, // one event, but it's active, so active time is 1ms not 0
+ },
+ },
+ {
+ snapshotData: {
+ events_summary: [
+ {
+ timestamp: 1682449093693,
+ type: 5,
+ data: {
+ payload: {
+ // doesn't match because href is nested in payload
+ href: 'http://127.0.0.1:8000/home',
+ },
+ },
+ windowId: '1',
+ },
+ {
+ timestamp: 1682449093469,
+ type: 4,
+ data: {
+ href: 'http://127.0.0.1:8000/second/url',
+ },
+ windowId: '1',
+ },
+ ],
+ },
+ expected: {
+ click_count: 0,
+ keypress_count: 0,
+ mouse_activity_count: 0,
+ first_url: 'http://127.0.0.1:8000/second/url',
+ first_timestamp: '2023-04-25 18:58:13.469',
+ last_timestamp: '2023-04-25 18:58:13.693',
+ active_milliseconds: 0, // no data.source, so no activity
+ },
+ },
+ {
+ snapshotData: {
+ events_summary: [
+ // three windows with 1 second, 2 seconds, and 3 seconds of activity
+ // even though they overlap they should be summed separately
+ { timestamp: 1682449093000, type: 3, data: { source: 2 }, windowId: '1' },
+ { timestamp: 1682449094000, type: 3, data: { source: 2 }, windowId: '1' },
+ { timestamp: 1682449095000, type: 3, data: { source: 2 }, windowId: '2' },
+ { timestamp: 1682449097000, type: 3, data: { source: 2 }, windowId: '2' },
+ { timestamp: 1682449096000, type: 3, data: { source: 2 }, windowId: '3' },
+ { timestamp: 1682449099000, type: 3, data: { source: 2 }, windowId: '3' },
+ ],
+ },
+ expected: {
+ click_count: 6,
+ keypress_count: 0,
+ mouse_activity_count: 6,
+ first_url: undefined,
+ first_timestamp: '2023-04-25 18:58:13.000',
+ last_timestamp: '2023-04-25 18:58:19.000',
+ active_milliseconds: 6000, // can sum up the activity across windows
+ },
+ },
+]
+sessionReplayEventTestCases.forEach(({ snapshotData, expected }) => {
+ test(`snapshot event ${JSON.stringify(snapshotData)} can be stored as session_replay_event`, () => {
+ const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
+ $session_id: 'abcf-efg',
+ $snapshot_data: snapshotData,
+ } as any as Properties)
+
+ const expectedEvent: SummarizedSessionRecordingEvent = {
+ distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4',
+ session_id: 'abcf-efg',
+ team_id: 2,
+ uuid: 'some-id',
+ ...expected,
+ }
+ expect(data).toEqual(expectedEvent)
+ })
+})
+
+test(`snapshot event with no event summary is ignored`, () => {
+ const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
+ $session_id: 'abcf-efg',
+ $snapshot_data: {},
+ } as any as Properties)
+
+ expect(data).toEqual(null)
+})
+
+test(`snapshot event with no event summary timestamps is ignored`, () => {
+ const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
+ $session_id: 'abcf-efg',
+ $snapshot_data: {
+ events_summary: [
+ {
+ type: 5,
+ data: {
+ payload: {
+ // doesn't match because href is nested in payload
+ href: 'http://127.0.0.1:8000/home',
+ },
+ },
+ },
+ {
+ type: 4,
+ data: {
+ href: 'http://127.0.0.1:8000/second/url',
+ },
+ },
+ ],
+ },
+ } as any as Properties)
+
+ expect(data).toEqual(null)
+})
test('performance event stored as performance_event', () => {
const data = createPerformanceEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', {
diff --git a/posthog/clickhouse/migrations/0043_session_replay_events.py b/posthog/clickhouse/migrations/0043_session_replay_events.py
new file mode 100644
index 00000000000..1fde598e3ea
--- /dev/null
+++ b/posthog/clickhouse/migrations/0043_session_replay_events.py
@@ -0,0 +1,17 @@
+from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
+from posthog.models.session_replay_event.sql import (
+ SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
+ KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
+ SESSION_REPLAY_EVENTS_TABLE_SQL,
+ DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
+ WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL,
+)
+
+
+operations = [
+ run_sql_with_exceptions(WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL()),
+ run_sql_with_exceptions(DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL()),
+ run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_SQL()),
+ run_sql_with_exceptions(KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
+ run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
+]
diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py
index 909298b32ef..aa31fb4343a 100644
--- a/posthog/clickhouse/schema.py
+++ b/posthog/clickhouse/schema.py
@@ -28,6 +28,12 @@ from posthog.models.person_overrides.sql import (
PERSON_OVERRIDES_CREATE_TABLE_SQL,
)
from posthog.models.session_recording_event.sql import *
+from posthog.models.session_replay_event.sql import (
+ KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
+ DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
+ SESSION_REPLAY_EVENTS_TABLE_SQL,
+ SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
+)
CREATE_MERGETREE_TABLE_QUERIES = (
CREATE_COHORTPEOPLE_TABLE_SQL,
@@ -44,6 +50,7 @@ CREATE_MERGETREE_TABLE_QUERIES = (
INGESTION_WARNINGS_DATA_TABLE_SQL,
APP_METRICS_DATA_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_SQL,
+ SESSION_REPLAY_EVENTS_TABLE_SQL,
)
CREATE_DISTRIBUTED_TABLE_QUERIES = (
WRITABLE_EVENTS_TABLE_SQL,
@@ -54,6 +61,7 @@ CREATE_DISTRIBUTED_TABLE_QUERIES = (
DISTRIBUTED_APP_METRICS_TABLE_SQL,
WRITABLE_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_PERFORMANCE_EVENTS_TABLE_SQL,
+ DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
CREATE_KAFKA_TABLE_QUERIES = (
KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL,
@@ -68,6 +76,7 @@ CREATE_KAFKA_TABLE_QUERIES = (
KAFKA_INGESTION_WARNINGS_TABLE_SQL,
KAFKA_APP_METRICS_TABLE_SQL,
KAFKA_PERFORMANCE_EVENTS_TABLE_SQL,
+ KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
CREATE_MV_TABLE_QUERIES = (
DEAD_LETTER_QUEUE_TABLE_MV_SQL,
@@ -82,6 +91,7 @@ CREATE_MV_TABLE_QUERIES = (
INGESTION_WARNINGS_MV_TABLE_SQL,
APP_METRICS_MV_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_MV_SQL,
+ SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
)
CREATE_TABLE_QUERIES = (
diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr
index 844c73783a8..5011ce5fd2b 100644
--- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr
+++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr
@@ -318,6 +318,25 @@
'
---
+# name: test_create_kafka_table_with_different_kafka_host[kafka_session_replay_events]
+ '
+
+ CREATE TABLE IF NOT EXISTS kafka_session_replay_events ON CLUSTER 'posthog'
+ (
+ session_id VARCHAR,
+ team_id Int64,
+ distinct_id VARCHAR,
+ first_timestamp DateTime64(6, 'UTC'),
+ last_timestamp DateTime64(6, 'UTC'),
+ first_url Nullable(VARCHAR),
+ click_count Int64,
+ keypress_count Int64,
+ mouse_activity_count Int64,
+ active_milliseconds Int64
+ ) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')
+
+ '
+---
# name: test_create_table_query[app_metrics]
'
@@ -880,6 +899,25 @@
'
---
+# name: test_create_table_query[kafka_session_replay_events]
+ '
+
+ CREATE TABLE IF NOT EXISTS kafka_session_replay_events ON CLUSTER 'posthog'
+ (
+ session_id VARCHAR,
+ team_id Int64,
+ distinct_id VARCHAR,
+ first_timestamp DateTime64(6, 'UTC'),
+ last_timestamp DateTime64(6, 'UTC'),
+ first_url Nullable(VARCHAR),
+ click_count Int64,
+ keypress_count Int64,
+ mouse_activity_count Int64,
+ active_milliseconds Int64
+ ) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')
+
+ '
+---
# name: test_create_table_query[performance_events]
'
@@ -1273,6 +1311,60 @@
'
---
+# name: test_create_table_query[session_replay_events]
+ '
+
+ CREATE TABLE IF NOT EXISTS session_replay_events ON CLUSTER 'posthog'
+ (
+ -- part of order by so will aggregate correctly
+ session_id VARCHAR,
+ -- part of order by so will aggregate correctly
+ team_id Int64,
+ -- ClickHouse will pick any value of distinct_id for the session
+ -- this is fine since even if the distinct_id changes during a session
+ -- it will still (or should still) map to the same person
+ distinct_id VARCHAR,
+ min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
+ max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
+ first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
+ click_count SimpleAggregateFunction(sum, Int64),
+ keypress_count SimpleAggregateFunction(sum, Int64),
+ mouse_activity_count SimpleAggregateFunction(sum, Int64),
+ active_milliseconds SimpleAggregateFunction(sum, Int64)
+ ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events', sipHash64(distinct_id))
+
+ '
+---
+# name: test_create_table_query[session_replay_events_mv]
+ '
+
+ CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_mv ON CLUSTER 'posthog'
+ TO posthog_test.writable_session_replay_events
+ AS SELECT
+ session_id,
+ team_id,
+ any(distinct_id) as distinct_id,
+ min(first_timestamp) AS min_first_timestamp,
+ max(last_timestamp) AS max_last_timestamp,
+ -- TRICKY: ClickHouse will pick a relatively random first_url
+ -- when it collapses the aggregating merge tree
+ -- unless we teach it what we want...
+ -- argMin ignores null values
+ -- so this will get the first non-null value of first_url
+ -- for each group of session_id and team_id
+ -- by min of first_timestamp in the batch
+ -- this is an aggregate function, not a simple aggregate function
+ -- so we have to write to argMinState, and query with argMinMerge
+ argMinState(first_url, first_timestamp) as first_url,
+ sum(click_count) as click_count,
+ sum(keypress_count) as keypress_count,
+ sum(mouse_activity_count) as mouse_activity_count,
+ sum(active_milliseconds) as active_milliseconds
+ FROM posthog_test.kafka_session_replay_events
+ group by session_id, team_id
+
+ '
+---
# name: test_create_table_query[sharded_app_metrics]
'
@@ -1475,6 +1567,44 @@
'
---
+# name: test_create_table_query[sharded_session_replay_events]
+ '
+
+ CREATE TABLE IF NOT EXISTS sharded_session_replay_events ON CLUSTER 'posthog'
+ (
+ -- part of order by so will aggregate correctly
+ session_id VARCHAR,
+ -- part of order by so will aggregate correctly
+ team_id Int64,
+ -- ClickHouse will pick any value of distinct_id for the session
+ -- this is fine since even if the distinct_id changes during a session
+ -- it will still (or should still) map to the same person
+ distinct_id VARCHAR,
+ min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
+ max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
+ first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
+ click_count SimpleAggregateFunction(sum, Int64),
+ keypress_count SimpleAggregateFunction(sum, Int64),
+ mouse_activity_count SimpleAggregateFunction(sum, Int64),
+ active_milliseconds SimpleAggregateFunction(sum, Int64)
+ ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')
+
+ PARTITION BY toYYYYMM(min_first_timestamp)
+ -- order by is used by the aggregating merge tree engine to
+ -- identify candidates to merge, e.g. toDate(min_first_timestamp)
+ -- would mean we would have one row per day per session_id
+ -- if CH could completely merge to match the order by
+ -- it is also used to organise data to make queries faster
+ -- we want the fewest rows possible but also the fastest queries
+ -- since we query by date and not by time
+ -- and order by must be in order of increasing cardinality
+ -- so we order by date first, then team_id, then session_id
+ -- hopefully, this is a good balance between the two
+ ORDER BY (toDate(min_first_timestamp), team_id, session_id)
+ SETTINGS index_granularity=512
+
+ '
+---
# name: test_create_table_query[writable_events]
'
@@ -2051,3 +2181,41 @@
'
---
+# name: test_create_table_query_replicated_and_storage[sharded_session_replay_events]
+ '
+
+ CREATE TABLE IF NOT EXISTS sharded_session_replay_events ON CLUSTER 'posthog'
+ (
+ -- part of order by so will aggregate correctly
+ session_id VARCHAR,
+ -- part of order by so will aggregate correctly
+ team_id Int64,
+ -- ClickHouse will pick any value of distinct_id for the session
+ -- this is fine since even if the distinct_id changes during a session
+ -- it will still (or should still) map to the same person
+ distinct_id VARCHAR,
+ min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
+ max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
+ first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
+ click_count SimpleAggregateFunction(sum, Int64),
+ keypress_count SimpleAggregateFunction(sum, Int64),
+ mouse_activity_count SimpleAggregateFunction(sum, Int64),
+ active_milliseconds SimpleAggregateFunction(sum, Int64)
+ ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')
+
+ PARTITION BY toYYYYMM(min_first_timestamp)
+ -- order by is used by the aggregating merge tree engine to
+ -- identify candidates to merge, e.g. toDate(min_first_timestamp)
+ -- would mean we would have one row per day per session_id
+ -- if CH could completely merge to match the order by
+ -- it is also used to organise data to make queries faster
+ -- we want the fewest rows possible but also the fastest queries
+ -- since we query by date and not by time
+ -- and order by must be in order of increasing cardinality
+ -- so we order by date first, then team_id, then session_id
+ -- hopefully, this is a good balance between the two
+ ORDER BY (toDate(min_first_timestamp), team_id, session_id)
+ SETTINGS index_granularity=512
+
+ '
+---
diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py
index 1e70be20666..d9bd032305b 100644
--- a/posthog/kafka_client/topics.py
+++ b/posthog/kafka_client/topics.py
@@ -9,6 +9,7 @@ KAFKA_PERSON = f"{KAFKA_PREFIX}clickhouse_person{SUFFIX}"
KAFKA_PERSON_UNIQUE_ID = f"{KAFKA_PREFIX}clickhouse_person_unique_id{SUFFIX}" # DEPRECATED_DO_NOT_USE
KAFKA_PERSON_DISTINCT_ID = f"{KAFKA_PREFIX}clickhouse_person_distinct_id{SUFFIX}"
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}"
+KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}"
KAFKA_PERFORMANCE_EVENTS = f"{KAFKA_PREFIX}clickhouse_performance_events{SUFFIX}"
KAFKA_PLUGIN_LOG_ENTRIES = f"{KAFKA_PREFIX}plugin_log_entries{SUFFIX}"
KAFKA_DEAD_LETTER_QUEUE = f"{KAFKA_PREFIX}events_dead_letter_queue{SUFFIX}"
diff --git a/posthog/models/session_replay_event/sql.py b/posthog/models/session_replay_event/sql.py
new file mode 100644
index 00000000000..64c13029e41
--- /dev/null
+++ b/posthog/models/session_replay_event/sql.py
@@ -0,0 +1,136 @@
+from django.conf import settings
+
+from posthog.clickhouse.kafka_engine import kafka_engine
+from posthog.clickhouse.table_engines import Distributed, ReplicationScheme, AggregatingMergeTree
+from posthog.kafka_client.topics import KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS
+
+SESSION_REPLAY_EVENTS_DATA_TABLE = lambda: "sharded_session_replay_events"
+
+"""
+Kafka needs slightly different column setup. It receives individual events, not aggregates.
+We write first_timestamp and last_timestamp as individual records
+They will be grouped as min_first_timestamp and max_last_timestamp in the main table
+"""
+KAFKA_SESSION_REPLAY_EVENTS_TABLE_BASE_SQL = """
+CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
+(
+ session_id VARCHAR,
+ team_id Int64,
+ distinct_id VARCHAR,
+ first_timestamp DateTime64(6, 'UTC'),
+ last_timestamp DateTime64(6, 'UTC'),
+ first_url Nullable(VARCHAR),
+ click_count Int64,
+ keypress_count Int64,
+ mouse_activity_count Int64,
+ active_milliseconds Int64
+) ENGINE = {engine}
+"""
+
+SESSION_REPLAY_EVENTS_TABLE_BASE_SQL = """
+CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
+(
+ -- part of order by so will aggregate correctly
+ session_id VARCHAR,
+ -- part of order by so will aggregate correctly
+ team_id Int64,
+ -- ClickHouse will pick any value of distinct_id for the session
+ -- this is fine since even if the distinct_id changes during a session
+ -- it will still (or should still) map to the same person
+ distinct_id VARCHAR,
+ min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
+ max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
+ first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
+ click_count SimpleAggregateFunction(sum, Int64),
+ keypress_count SimpleAggregateFunction(sum, Int64),
+ mouse_activity_count SimpleAggregateFunction(sum, Int64),
+ active_milliseconds SimpleAggregateFunction(sum, Int64)
+) ENGINE = {engine}
+"""
+
+
+SESSION_REPLAY_EVENTS_DATA_TABLE_ENGINE = lambda: AggregatingMergeTree(
+ "session_replay_events", replication_scheme=ReplicationScheme.SHARDED
+)
+
+SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: (
+ SESSION_REPLAY_EVENTS_TABLE_BASE_SQL
+ + """
+ PARTITION BY toYYYYMM(min_first_timestamp)
+ -- order by is used by the aggregating merge tree engine to
+ -- identify candidates to merge, e.g. toDate(min_first_timestamp)
+ -- would mean we would have one row per day per session_id
+ -- if CH could completely merge to match the order by
+ -- it is also used to organise data to make queries faster
+ -- we want the fewest rows possible but also the fastest queries
+ -- since we query by date and not by time
+ -- and order by must be in order of increasing cardinality
+ -- so we order by date first, then team_id, then session_id
+ -- hopefully, this is a good balance between the two
+ ORDER BY (toDate(min_first_timestamp), team_id, session_id)
+SETTINGS index_granularity=512
+"""
+).format(
+ table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
+ cluster=settings.CLICKHOUSE_CLUSTER,
+ engine=SESSION_REPLAY_EVENTS_DATA_TABLE_ENGINE(),
+)
+
+KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: KAFKA_SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format(
+ table_name="kafka_session_replay_events",
+ cluster=settings.CLICKHOUSE_CLUSTER,
+ engine=kafka_engine(topic=KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS),
+)
+
+SESSION_REPLAY_EVENTS_TABLE_MV_SQL = lambda: """
+CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_mv ON CLUSTER '{cluster}'
+TO {database}.{target_table}
+AS SELECT
+session_id,
+team_id,
+any(distinct_id) as distinct_id,
+min(first_timestamp) AS min_first_timestamp,
+max(last_timestamp) AS max_last_timestamp,
+-- TRICKY: ClickHouse will pick a relatively random first_url
+-- when it collapses the aggregating merge tree
+-- unless we teach it what we want...
+-- argMin ignores null values
+-- so this will get the first non-null value of first_url
+-- for each group of session_id and team_id
+-- by min of first_timestamp in the batch
+-- this is an aggregate function, not a simple aggregate function
+-- so we have to write to argMinState, and query with argMinMerge
+argMinState(first_url, first_timestamp) as first_url,
+sum(click_count) as click_count,
+sum(keypress_count) as keypress_count,
+sum(mouse_activity_count) as mouse_activity_count,
+sum(active_milliseconds) as active_milliseconds
+FROM {database}.kafka_session_replay_events
+group by session_id, team_id
+""".format(
+ target_table="writable_session_replay_events",
+ cluster=settings.CLICKHOUSE_CLUSTER,
+ database=settings.CLICKHOUSE_DATABASE,
+)
+
+
+# Distributed engine tables are only created if CLICKHOUSE_REPLICATED
+
+# This table is responsible for writing to sharded_session_replay_events based on a sharding key.
+WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format(
+ table_name="writable_session_replay_events",
+ cluster=settings.CLICKHOUSE_CLUSTER,
+ engine=Distributed(data_table=SESSION_REPLAY_EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
+)
+
+# This table is responsible for reading from session_replay_events on a cluster setting
+DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format(
+ table_name="session_replay_events",
+ cluster=settings.CLICKHOUSE_CLUSTER,
+ engine=Distributed(data_table=SESSION_REPLAY_EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
+)
+
+
+DROP_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: (
+ f"DROP TABLE IF EXISTS {SESSION_REPLAY_EVENTS_DATA_TABLE()} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'"
+)
diff --git a/posthog/queries/session_recordings/test/session_replay_sql.py b/posthog/queries/session_recordings/test/session_replay_sql.py
new file mode 100644
index 00000000000..528d26e3665
--- /dev/null
+++ b/posthog/queries/session_recordings/test/session_replay_sql.py
@@ -0,0 +1,119 @@
+from datetime import datetime
+from typing import Optional
+
+from dateutil.parser import parse
+from dateutil.relativedelta import relativedelta
+
+from posthog.kafka_client.client import ClickhouseProducer
+from posthog.kafka_client.topics import KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS
+from posthog.models.event.util import format_clickhouse_timestamp
+from posthog.utils import cast_timestamp_or_now
+
+INSERT_SINGLE_SESSION_REPLAY = """
+INSERT INTO sharded_session_replay_events (
+ session_id,
+ team_id,
+ distinct_id,
+ min_first_timestamp,
+ max_last_timestamp,
+ first_url,
+ click_count,
+ keypress_count,
+ mouse_activity_count,
+ active_milliseconds
+)
+SELECT
+ %(session_id)s,
+ %(team_id)s,
+ %(distinct_id)s,
+ toDateTime64(%(first_timestamp)s, 6, 'UTC'),
+ toDateTime64(%(last_timestamp)s, 6, 'UTC'),
+ argMinState(cast(%(first_url)s, 'Nullable(String)'), toDateTime64(%(first_timestamp)s, 6, 'UTC')),
+ %(click_count)s,
+ %(keypress_count)s,
+ %(mouse_activity_count)s,
+ %(active_milliseconds)s
+"""
+
+
+def _sensible_first_timestamp(
+ first_timestamp: Optional[str | datetime], last_timestamp: Optional[str | datetime]
+) -> str:
+ """
+ Normalise the first timestamp to be used in the session replay summary.
+ If it is not provided but there is a last_timestamp, use an hour before that last_timestamp
+ Otherwise we use the current time
+ """
+ sensible_timestamp = None
+ if first_timestamp is not None:
+ # TRICKY: check it not a string to avoid needing to check if it is a datetime or a Fakedatetime
+ if not isinstance(first_timestamp, str):
+ sensible_timestamp = first_timestamp.isoformat()
+ else:
+ sensible_timestamp = first_timestamp
+ else:
+ if last_timestamp is not None:
+ if isinstance(last_timestamp, str):
+ last_timestamp = parse(last_timestamp)
+
+ sensible_timestamp = (last_timestamp - relativedelta(seconds=3600)).isoformat()
+
+ return format_clickhouse_timestamp(cast_timestamp_or_now(sensible_timestamp))
+
+
+def _sensible_last_timestamp(
+ first_timestamp: Optional[str | datetime], last_timestamp: Optional[str | datetime]
+) -> str:
+ """
+ Normalise the last timestamp to be used in the session replay summary.
+ If it is not provided but there is a first_timestamp, use an hour after that last_timestamp
+ Otherwise we use the current time
+ """
+ sensible_timestamp = None
+ if last_timestamp is not None:
+ # TRICKY: check it not a string to avoid needing to check if it is a datetime or a Fakedatetime
+ if not isinstance(last_timestamp, str):
+ sensible_timestamp = last_timestamp.isoformat()
+ else:
+ sensible_timestamp = last_timestamp
+ else:
+ if first_timestamp is not None:
+ if isinstance(first_timestamp, str):
+ first_timestamp = parse(first_timestamp)
+
+ sensible_timestamp = (first_timestamp - relativedelta(seconds=3600)).isoformat()
+
+ return format_clickhouse_timestamp(cast_timestamp_or_now(sensible_timestamp))
+
+
+def produce_replay_summary(
+ team_id: int,
+ session_id: Optional[str] = None,
+ distinct_id: Optional[str] = None,
+ first_timestamp: Optional[str | datetime] = None,
+ last_timestamp: Optional[str | datetime] = None,
+ first_url: Optional[str | None] = None,
+ click_count: Optional[int] = None,
+ keypress_count: Optional[int] = None,
+ mouse_activity_count: Optional[int] = None,
+ active_milliseconds: Optional[float] = None,
+):
+
+ first_timestamp = _sensible_first_timestamp(first_timestamp, last_timestamp)
+ last_timestamp = _sensible_last_timestamp(first_timestamp, last_timestamp)
+
+ data = {
+ "session_id": session_id or "1",
+ "team_id": team_id,
+ "distinct_id": distinct_id or "user",
+ "first_timestamp": format_clickhouse_timestamp(cast_timestamp_or_now(first_timestamp)),
+ "last_timestamp": format_clickhouse_timestamp(cast_timestamp_or_now(last_timestamp)),
+ "first_url": first_url,
+ "click_count": click_count or 0,
+ "keypress_count": keypress_count or 0,
+ "mouse_activity_count": mouse_activity_count or 0,
+ "active_milliseconds": active_milliseconds or 0,
+ }
+ p = ClickhouseProducer()
+ # because this is in a test it will write directly using SQL not really with Kafka
+ p.produce(topic=KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, sql=INSERT_SINGLE_SESSION_REPLAY, data=data)
diff --git a/posthog/queries/session_recordings/test/test_session_replay_summaries.py b/posthog/queries/session_recordings/test/test_session_replay_summaries.py
new file mode 100644
index 00000000000..8c8fad0fa2f
--- /dev/null
+++ b/posthog/queries/session_recordings/test/test_session_replay_summaries.py
@@ -0,0 +1,157 @@
+from datetime import datetime, timedelta
+from uuid import uuid4
+
+import pytz
+from dateutil.parser import isoparse
+
+from posthog.clickhouse.client import sync_execute
+from posthog.models import Team
+from posthog.models.event.util import format_clickhouse_timestamp
+from posthog.queries.app_metrics.serializers import AppMetricsRequestSerializer
+from posthog.queries.session_recordings.test.session_replay_sql import produce_replay_summary
+from posthog.test.base import BaseTest, ClickhouseTestMixin, snapshot_clickhouse_queries
+
+
+def make_filter(serializer_klass=AppMetricsRequestSerializer, **kwargs) -> AppMetricsRequestSerializer:
+ filter = serializer_klass(data=kwargs)
+ filter.is_valid(raise_exception=True)
+ return filter
+
+
+class SessionReplaySummaryQuery:
+ def __init__(self, team: Team, session_id: str, reference_date: str):
+ self.team = team
+ self.session_id = session_id
+ self.reference_date = reference_date
+
+ def list_all(self):
+ params = {
+ "team_id": self.team.pk,
+ "start_time": format_clickhouse_timestamp(isoparse(self.reference_date) - timedelta(hours=48)),
+ "end_time": format_clickhouse_timestamp(isoparse(self.reference_date) + timedelta(hours=48)),
+ "session_ids": (self.session_id,),
+ }
+
+ results = sync_execute(
+ """
+ select
+ session_id,
+ any(team_id),
+ any(distinct_id),
+ min(min_first_timestamp),
+ max(max_last_timestamp),
+ dateDiff('SECOND', min(min_first_timestamp), max(max_last_timestamp)) as duration,
+ argMinMerge(first_url) as first_url,
+ sum(click_count),
+ sum(keypress_count),
+ sum(mouse_activity_count),
+ round((sum(active_milliseconds)/1000)/duration, 2) as active_time
+ from session_replay_events
+ prewhere team_id = %(team_id)s
+ and min_first_timestamp >= %(start_time)s
+ and max_last_timestamp <= %(end_time)s
+ and session_id in %(session_ids)s
+ group by session_id
+ """,
+ params,
+ )
+ return results
+
+
+class TestReceiveSummarizedSessionReplays(ClickhouseTestMixin, BaseTest):
+ @snapshot_clickhouse_queries
+ def test_session_replay_summaries_can_be_queried(self):
+ session_id = str(uuid4())
+
+ produce_replay_summary(
+ session_id=session_id,
+ team_id=self.team.pk,
+ # can CH handle a timestamp with no T
+ first_timestamp="2023-04-27 10:00:00.309",
+ last_timestamp="2023-04-27 14:20:42.237",
+ distinct_id=str(self.user.distinct_id),
+ first_url="https://first-url-ingested.com",
+ click_count=2,
+ keypress_count=2,
+ mouse_activity_count=2,
+ active_milliseconds=33624 * 1000 * 0.3, # 30% of the total expected duration
+ )
+
+ produce_replay_summary(
+ session_id=session_id,
+ team_id=self.team.pk,
+ first_timestamp="2023-04-27T19:17:38.116",
+ last_timestamp="2023-04-27T19:17:38.117",
+ distinct_id=str(self.user.distinct_id),
+ first_url="https://second-url-ingested.com",
+ click_count=2,
+ keypress_count=2,
+ mouse_activity_count=2,
+ )
+ produce_replay_summary(
+ session_id=session_id,
+ team_id=self.team.pk,
+ first_timestamp="2023-04-27T19:18:24.597",
+ last_timestamp="2023-04-27T19:20:24.597",
+ distinct_id=str(self.user.distinct_id),
+ first_url="https://third-url-ingested.com",
+ click_count=2,
+ keypress_count=2,
+ mouse_activity_count=2,
+ )
+
+ # same session but ends more than 2 days from start so excluded
+ produce_replay_summary(
+ session_id=session_id,
+ team_id=self.team.pk,
+ first_timestamp="2023-04-26T19:18:24.597",
+ last_timestamp="2023-04-29T19:20:24.597",
+ distinct_id=str(self.user.distinct_id),
+ first_url=None,
+ click_count=2,
+ keypress_count=2,
+ mouse_activity_count=2,
+ )
+
+ # same session but a different team so excluded
+ produce_replay_summary(
+ session_id=session_id,
+ team_id=self.team.pk + 100,
+ first_timestamp="2023-04-26T19:18:24.597",
+ last_timestamp="2023-04-28T19:20:24.597",
+ distinct_id=str(self.user.distinct_id),
+ first_url=None,
+ click_count=2,
+ keypress_count=2,
+ mouse_activity_count=2,
+ )
+
+ # different session so excluded
+ produce_replay_summary(
+ session_id=str(uuid4()),
+ team_id=self.team.pk,
+ first_timestamp="2023-04-26T19:18:24.597",
+ last_timestamp="2023-04-26T19:20:24.597",
+ distinct_id=str(self.user.distinct_id),
+ first_url=None,
+ click_count=2,
+ keypress_count=2,
+ mouse_activity_count=2,
+ )
+
+ results = SessionReplaySummaryQuery(self.team, session_id, "2023-04-26T19:18:24.597").list_all()
+ assert results == [
+ (
+ session_id,
+ self.team.pk,
+ str(self.user.distinct_id),
+ datetime(2023, 4, 27, 10, 0, 0, 309000, tzinfo=pytz.UTC),
+ datetime(2023, 4, 27, 19, 20, 24, 597000, tzinfo=pytz.UTC),
+ 33624,
+ "https://first-url-ingested.com",
+ 6,
+ 6,
+ 6,
+ 0.3,
+ )
+ ]
diff --git a/posthog/test/base.py b/posthog/test/base.py
index f13d170d0cf..549488c0e5e 100644
--- a/posthog/test/base.py
+++ b/posthog/test/base.py
@@ -42,6 +42,11 @@ from posthog.models.session_recording_event.sql import (
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL,
SESSION_RECORDING_EVENTS_TABLE_SQL,
)
+from posthog.models.session_replay_event.sql import (
+ SESSION_REPLAY_EVENTS_TABLE_SQL,
+ DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
+ DROP_SESSION_REPLAY_EVENTS_TABLE_SQL,
+)
from posthog.settings.utils import get_from_env, str_to_bool
persons_cache_tests: List[Dict[str, Any]] = []
@@ -660,6 +665,7 @@ class ClickhouseDestroyTablesMixin(BaseTest):
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID2_TABLE_SQL,
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
+ DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
TRUNCATE_GROUPS_TABLE_SQL,
TRUNCATE_COHORTPEOPLE_TABLE_SQL,
TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL,
@@ -667,10 +673,19 @@ class ClickhouseDestroyTablesMixin(BaseTest):
]
)
run_clickhouse_statement_in_parallel(
- [EVENTS_TABLE_SQL(), PERSONS_TABLE_SQL(), SESSION_RECORDING_EVENTS_TABLE_SQL()]
+ [
+ EVENTS_TABLE_SQL(),
+ PERSONS_TABLE_SQL(),
+ SESSION_RECORDING_EVENTS_TABLE_SQL(),
+ SESSION_REPLAY_EVENTS_TABLE_SQL(),
+ ]
)
run_clickhouse_statement_in_parallel(
- [DISTRIBUTED_EVENTS_TABLE_SQL(), DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL()]
+ [
+ DISTRIBUTED_EVENTS_TABLE_SQL(),
+ DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
+ DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
+ ]
)
def tearDown(self):
@@ -682,13 +697,24 @@ class ClickhouseDestroyTablesMixin(BaseTest):
DROP_PERSON_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
+ DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
+ ]
+ )
+
+ run_clickhouse_statement_in_parallel(
+ [
+ EVENTS_TABLE_SQL(),
+ PERSONS_TABLE_SQL(),
+ SESSION_RECORDING_EVENTS_TABLE_SQL(),
+ SESSION_REPLAY_EVENTS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
- [EVENTS_TABLE_SQL(), PERSONS_TABLE_SQL(), SESSION_RECORDING_EVENTS_TABLE_SQL()]
- )
- run_clickhouse_statement_in_parallel(
- [DISTRIBUTED_EVENTS_TABLE_SQL(), DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL()]
+ [
+ DISTRIBUTED_EVENTS_TABLE_SQL(),
+ DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
+ DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
+ ]
)