0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 21:49:51 +01:00

feat: write recording summary events (#15245)

Problem
see #15200 (comment)

When we store session recording events we materialize a lot of information using the snapshot data column.

We'll soon not be storing the snapshot data so won't be able to use that to materialize that information, so we need to capture it earlier in the pipeline. Since this is only used for searching for/summarizing recordings we don't need to store every event.

Changes
We'll push a summary event to a new kafka topic during ingestion. ClickHouse can ingest from that topic into an aggregating merge tree table. So that we store (in theory, although not in practice) only one row per session.

add config to turn this on and off by team in plugin server
add code behind that to write session recording summary events to a new topic in kafka
add ClickHouse tables to ingest and aggregate those summary events
This commit is contained in:
Paul D'Ambra 2023-05-09 15:41:16 +01:00 committed by GitHub
parent 26bc7ba5d5
commit 067d73cb4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1146 additions and 28 deletions

View File

@ -117,8 +117,7 @@ jobs:
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
cd current
python -m pip install -r requirements-dev.txt
python -m pip install -r requirements.txt
python -m pip install -r requirements.txt -r requirements-dev.txt
- name: Check for syntax errors, import sort, and code style violations
run: |
@ -199,8 +198,7 @@ jobs:
- name: Install python dependencies
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
python -m pip install -r requirements-dev.txt
python -m pip install -r requirements.txt
python -m pip install -r requirements.txt -r requirements-dev.txt
- uses: actions/checkout@v3
with:
@ -210,8 +208,7 @@ jobs:
run: |
# We need to ensure we have requirements for the master branch
# now also, so we can run migrations up to master.
python -m pip install -r requirements-dev.txt
python -m pip install -r requirements.txt
python -m pip install -r requirements.txt -r requirements-dev.txt
python manage.py migrate
- uses: actions/checkout@v3
@ -356,8 +353,7 @@ jobs:
- name: Install python dependencies
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
python -m pip install -r master/requirements-dev.txt
python -m pip install -r master/requirements.txt
python -m pip install -r master/requirements.txt -r master/requirements-dev.txt
- name: Wait for Clickhouse & Kafka
run: master/bin/check_kafka_clickhouse_up
@ -376,9 +372,7 @@ jobs:
- name: Install requirements.txt dependencies with pip at current branch
run: |
cd current
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django syrupy
python -m pip install -r requirements.txt -r requirements-dev.txt
- name: Link posthog-cloud at current branch
run: |

View File

@ -61,6 +61,7 @@ jobs:
export RELEASE_NAME=posthog
export NAMESPACE=pr-$PR_NUM-${BRANCH_NAME//\//-}
export NAMESPACE=${NAMESPACE:0:38}
export NAMESPACE=${NAMESPACE%%-}
export HOSTNAME=$NAMESPACE
export TAILNET_NAME=hedgehog-kitefin
export TS_AUTHKEY=${{ secrets.TAILSCALE_SERVICE_AUTHKEY }}

View File

@ -13,7 +13,8 @@
<env name="OBJECT_STORAGE_ENABLED" value="True" />
<env name="WORKER_CONCURRENCY" value="2" />
<env name="SESSION_RECORDING_BLOB_PROCESSING_TEAMS" value="all" />
<env name="SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS" value="all" />
</envs>
<method v="2" />
</configuration>
</component>
</component>

View File

@ -2,6 +2,14 @@ import { EventType, IncrementalSource, eventWithTime } from '@rrweb/types'
import { Dayjs } from 'lib/dayjs'
import { RecordingSegment, RecordingSnapshot } from '~/types'
/**
* This file is copied into the plugin server to calculate activeMilliseconds on ingestion
* plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts
*
* Changes here should be reflected there
* TODO add code sharing between plugin-server and front-end so that this duplication is unnecessary
*/
const activeSources = [
IncrementalSource.MouseMove,
IncrementalSource.MouseInteraction,

View File

@ -113,13 +113,15 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: BW Change this to 'all' when we release it fully
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: Change this to 'all' when we release it fully
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10, // NOTE: 10 minutes
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: ['dev', 'test'].includes(process.env.NODE_ENV || 'undefined')
? 1024 // NOTE: ~1MB in dev or test, so that even with gzipped content we still flush pretty frequently
: 1024 * 50, // ~50MB after compression in prod
SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings',
SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: '', // TODO: Change this to 'all' when we release it fully
}
}

View File

@ -10,6 +10,7 @@ export const KAFKA_PERSON = `${prefix}clickhouse_person${suffix}`
export const KAFKA_PERSON_UNIQUE_ID = `${prefix}clickhouse_person_unique_id${suffix}`
export const KAFKA_PERSON_DISTINCT_ID = `${prefix}clickhouse_person_distinct_id${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION = `${prefix}events_plugin_ingestion${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW = `${prefix}events_plugin_ingestion_overflow${suffix}`

View File

@ -10,8 +10,7 @@ import { instrumentEachBatch, setupEventHandlers } from './kafka-queue'
import { latestOffsetTimestampGauge } from './metrics'
// The valid task types that can be scheduled.
// TODO: not sure if there is another place that defines these but it would be
// good to unify.
// TODO: not sure if there is another place that defines these but it would be good to unify.
const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const
export const startScheduledTasksConsumer = async ({

View File

@ -1,8 +1,10 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { captureException } from '@sentry/node'
import { HighLevelProducer as RdKafkaProducer, Message, NumberNullUndefined } from 'node-rdkafka-acosom'
import {
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
KAFKA_PERFORMANCE_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
@ -14,7 +16,12 @@ import { createKafkaProducer, disconnectProducer, flushProducer, produce } from
import { PipelineEvent, RawEventMessage, Team } from '../../../types'
import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
import { createPerformanceEvent, createSessionRecordingEvent } from '../../../worker/ingestion/process-event'
import {
createPerformanceEvent,
createSessionRecordingEvent,
createSessionReplayEvent,
SummarizedSessionRecordingEvent,
} from '../../../worker/ingestion/process-event'
import { TeamManager } from '../../../worker/ingestion/team-manager'
import { parseEventTimestamp } from '../../../worker/ingestion/timestamps'
import { eventDroppedCounter } from '../metrics'
@ -25,12 +32,14 @@ export const startSessionRecordingEventsConsumer = async ({
consumerMaxBytes,
consumerMaxBytesPerPartition,
consumerMaxWaitMs,
summaryIngestionEnabledTeams,
}: {
teamManager: TeamManager
kafkaConfig: KafkaConfig
consumerMaxBytes: number
consumerMaxBytesPerPartition: number
consumerMaxWaitMs: number
summaryIngestionEnabledTeams: string
}) => {
/*
For Session Recordings we need to prepare the data for ClickHouse.
@ -59,7 +68,13 @@ export const startSessionRecordingEventsConsumer = async ({
const connectionConfig = createRdConnectionConfigFromEnvVars(kafkaConfig)
const producer = await createKafkaProducer(connectionConfig)
const eachBatchWithContext = eachBatch({ teamManager, producer })
const eachBatchWithContext = eachBatch({
teamManager,
producer,
summaryEnabledTeams:
summaryIngestionEnabledTeams === 'all' ? null : summaryIngestionEnabledTeams.split(',').map(parseInt),
})
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
@ -84,7 +99,15 @@ export const startSessionRecordingEventsConsumer = async ({
}
export const eachBatch =
({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
({
teamManager,
producer,
summaryEnabledTeams,
}: {
teamManager: TeamManager
producer: RdKafkaProducer
summaryEnabledTeams: number[] | null
}) =>
async (messages: Message[]) => {
// To start with, we simply process each message in turn,
// without attempting to perform any concurrency. There is a lot
@ -105,7 +128,7 @@ export const eachBatch =
// DependencyUnavailableError error to distinguish between
// intermittent and permanent errors.
const pendingProduceRequests: Promise<NumberNullUndefined>[] = []
const eachMessageWithContext = eachMessage({ teamManager, producer })
const eachMessageWithContext = eachMessage({ teamManager, producer, summaryEnabledTeams })
for (const message of messages) {
const results = await retryOnDependencyUnavailableError(() => eachMessageWithContext(message))
@ -148,7 +171,15 @@ export const eachBatch =
}
const eachMessage =
({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
({
teamManager,
producer,
summaryEnabledTeams,
}: {
teamManager: TeamManager
producer: RdKafkaProducer
summaryEnabledTeams: number[] | null
}) =>
async (message: Message) => {
// For each message, we:
//
@ -252,7 +283,23 @@ const eachMessage =
event.properties || {}
)
return [
let replayRecord: null | SummarizedSessionRecordingEvent = null
try {
if (summaryEnabledTeams === null || summaryEnabledTeams?.includes(team.id)) {
replayRecord = createSessionReplayEvent(
messagePayload.uuid,
team.id,
messagePayload.distinct_id,
event.ip,
event.properties || {}
)
}
} catch (e) {
status.warn('??', 'session_replay_summarizer_error', { error: e })
captureException(e)
}
const producePromises = [
produce({
producer,
topic: KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
@ -260,6 +307,18 @@ const eachMessage =
key: message.key ? Buffer.from(message.key) : null,
}),
]
if (replayRecord) {
producePromises.push(
produce({
producer,
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
key: message.key ? Buffer.from(message.key) : null,
})
)
}
return producePromises
} else if (event.event === '$performance_event') {
const clickHouseRecord = createPerformanceEvent(
messagePayload.uuid,

View File

@ -0,0 +1,111 @@
/**
* This file is a cut-down version of the segmenter.ts
* https://github.com/PostHog/posthog/blob/db2deaf650d2eca9addba5e3f304a17a21041f25/frontend/src/scenes/session-recordings/player/utils/segmenter.ts
*
* It has been modified to not need the same dependencies
* Any changes may need to be sync'd between the two
*/
const activeSources = [1, 2, 3, 4, 5, 6, 7, 12]
const ACTIVITY_THRESHOLD_MS = 5000
export interface RRWebEventSummaryData {
href?: string
source?: number
payload?: Record<string, any>
}
export interface RRWebEventSummary {
timestamp: number
type: number
data: RRWebEventSummaryData
windowId: string
}
interface RecordingSegment {
kind: 'window' | 'buffer' | 'gap'
startTimestamp: number // Epoch time that the segment starts
endTimestamp: number // Epoch time that the segment ends
durationMs: number
windowId?: string
isActive: boolean
}
const isActiveEvent = (event: RRWebEventSummary): boolean => {
return event.type === 3 && activeSources.includes(event.data?.source || -1)
}
const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => {
let segments: RecordingSegment[] = []
let activeSegment!: Partial<RecordingSegment>
let lastActiveEventTimestamp = 0
snapshots.forEach((snapshot) => {
const eventIsActive = isActiveEvent(snapshot)
lastActiveEventTimestamp = eventIsActive ? snapshot.timestamp : lastActiveEventTimestamp
// When do we create a new segment?
// 1. If we don't have one yet
let isNewSegment = !activeSegment
// 2. If it is currently inactive but a new "active" event comes in
if (eventIsActive && !activeSegment?.isActive) {
isNewSegment = true
}
// 3. If it is currently active but no new active event has been seen for the activity threshold
if (activeSegment?.isActive && lastActiveEventTimestamp + ACTIVITY_THRESHOLD_MS < snapshot.timestamp) {
isNewSegment = true
}
// 4. If windowId changes we create a new segment
if (activeSegment?.windowId !== snapshot.windowId) {
isNewSegment = true
}
if (isNewSegment) {
if (activeSegment) {
segments.push(activeSegment as RecordingSegment)
}
activeSegment = {
kind: 'window',
startTimestamp: snapshot.timestamp,
windowId: snapshot.windowId,
isActive: eventIsActive,
}
}
activeSegment.endTimestamp = snapshot.timestamp
})
if (activeSegment) {
segments.push(activeSegment as RecordingSegment)
}
segments = segments.map((segment) => {
// These can all be done in a loop at the end...
segment.durationMs = segment.endTimestamp - segment.startTimestamp
return segment
})
return segments
}
/**
* TODO add code sharing between plugin-server and front-end so that this method can
* call the same createSegments function as the front-end
*/
export const activeMilliseconds = (snapshots: RRWebEventSummary[]): number => {
const segments = createSegments(snapshots)
return segments.reduce((acc, segment) => {
if (segment.isActive) {
// if the segment is active but has no duration we count it as 1ms
// to distinguish it from segments with no activity at all
return acc + Math.max(1, segment.durationMs)
}
return acc
}, 0)
}

View File

@ -405,6 +405,7 @@ export async function startPluginsServer(
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
summaryIngestionEnabledTeams: serverConfig.SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS,
})
stopSessionRecordingEventsConsumer = stop
joinSessionRecordingEventsConsumer = join

View File

@ -188,6 +188,8 @@ export interface PluginsServerConfig {
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number
SESSION_RECORDING_REMOTE_FOLDER: string
SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: string
}
export interface Hub extends PluginsServerConfig {

View File

@ -3,6 +3,7 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { DateTime } from 'luxon'
import { activeMilliseconds, RRWebEventSummary } from '../../main/ingestion-queues/session-recording/snapshot-segmenter'
import {
Element,
GroupTypeIndex,
@ -272,6 +273,81 @@ export const createSessionRecordingEvent = (
return data
}
export interface SummarizedSessionRecordingEvent {
uuid: string
first_timestamp: string
last_timestamp: string
team_id: number
distinct_id: string
session_id: string
first_url: string | undefined
click_count: number
keypress_count: number
mouse_activity_count: number
active_milliseconds: number
}
export const createSessionReplayEvent = (
uuid: string,
team_id: number,
distinct_id: string,
ip: string | null,
properties: Properties
) => {
const eventsSummaries: RRWebEventSummary[] = properties['$snapshot_data']?.['events_summary'] || []
const timestamps = eventsSummaries
.filter((eventSummary: RRWebEventSummary) => {
return !!eventSummary?.timestamp
})
.map((eventSummary: RRWebEventSummary) => {
return castTimestampOrNow(DateTime.fromMillis(eventSummary.timestamp), TimestampFormat.ClickHouse)
})
.sort()
if (eventsSummaries.length === 0 || timestamps.length === 0) {
return null
}
let clickCount = 0
let keypressCount = 0
let mouseActivity = 0
let url: string | undefined = undefined
eventsSummaries.forEach((eventSummary: RRWebEventSummary) => {
if (eventSummary.type === 3) {
mouseActivity += 1
if (eventSummary.data?.source === 2) {
clickCount += 1
}
if (eventSummary.data?.source === 5) {
keypressCount += 1
}
}
if (!!eventSummary.data?.href?.trim().length && url === undefined) {
url = eventSummary.data.href
}
})
const activeTime = activeMilliseconds(eventsSummaries)
const data: SummarizedSessionRecordingEvent = {
uuid,
team_id: team_id,
distinct_id: distinct_id,
session_id: properties['$session_id'],
first_timestamp: timestamps[0],
last_timestamp: timestamps[timestamps.length - 1],
click_count: clickCount,
keypress_count: keypressCount,
mouse_activity_count: mouseActivity,
first_url: url,
active_milliseconds: activeTime,
}
return data
}
export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) {
const data: Partial<RawPerformanceEvent> = {
uuid,

View File

@ -18,7 +18,7 @@ describe('session-recordings-consumer', () => {
beforeEach(() => {
postgres = new Pool({ connectionString: defaultConfig.DATABASE_URL })
teamManager = new TeamManager(postgres, {} as any)
eachBachWithDependencies = eachBatch({ producer, teamManager })
eachBachWithDependencies = eachBatch({ producer, teamManager, summaryEnabledTeams: [] })
})
afterEach(() => {
@ -63,4 +63,82 @@ describe('session-recordings-consumer', () => {
// Should have send to the DLQ.
expect(producer.produce).toHaveBeenCalledTimes(1)
})
test('eachBatch emits to only one topic', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)
await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
timestamp: 123,
},
])
expect(producer.produce).toHaveBeenCalledTimes(1)
})
test('eachBatch can emit to two topics', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)
const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: null })
await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({
team_id: teamId,
data: JSON.stringify({
event: '$snapshot',
properties: { $snapshot_data: { events_summary: [{ timestamp: 12345 }] } },
}),
}),
timestamp: 123,
},
])
expect(producer.produce).toHaveBeenCalledTimes(2)
})
test('eachBatch can emit to two topics for a specific team', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)
const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId] })
await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({
team_id: teamId,
data: JSON.stringify({
event: '$snapshot',
properties: { $snapshot_data: { events_summary: [{ timestamp: 12345 }] } },
}),
}),
timestamp: 123,
},
])
expect(producer.produce).toHaveBeenCalledTimes(2)
})
test('eachBatch can emit to only one topic when team is not summary enabled', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)
const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId + 1] })
await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
timestamp: 123,
},
])
expect(producer.produce).toHaveBeenCalledTimes(1)
})
})

View File

@ -11,6 +11,7 @@ import * as IORedis from 'ioredis'
import { DateTime } from 'luxon'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/config/kafka-topics'
import { RRWebEventSummary } from '../../src/main/ingestion-queues/session-recording/snapshot-segmenter'
import {
ClickHouseEvent,
Database,
@ -29,7 +30,9 @@ import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/r
import {
createPerformanceEvent,
createSessionRecordingEvent,
createSessionReplayEvent,
EventsProcessor,
SummarizedSessionRecordingEvent,
} from '../../src/worker/ingestion/process-event'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse'
import { resetKafka } from '../helpers/kafka'
@ -1208,6 +1211,154 @@ test('snapshot event stored as session_recording_event', () => {
window_id: undefined,
})
})
const sessionReplayEventTestCases: {
snapshotData: { events_summary: RRWebEventSummary[] }
expected: Pick<
SummarizedSessionRecordingEvent,
| 'click_count'
| 'keypress_count'
| 'mouse_activity_count'
| 'first_url'
| 'first_timestamp'
| 'last_timestamp'
| 'active_milliseconds'
>
}[] = [
{
snapshotData: { events_summary: [{ timestamp: 1682449093469, type: 3, data: { source: 2 }, windowId: '1' }] },
expected: {
click_count: 1,
keypress_count: 0,
mouse_activity_count: 1,
first_url: undefined,
first_timestamp: '2023-04-25 18:58:13.469',
last_timestamp: '2023-04-25 18:58:13.469',
active_milliseconds: 1, // one event, but it's active, so active time is 1ms not 0
},
},
{
snapshotData: { events_summary: [{ timestamp: 1682449093469, type: 3, data: { source: 5 }, windowId: '1' }] },
expected: {
click_count: 0,
keypress_count: 1,
mouse_activity_count: 1,
first_url: undefined,
first_timestamp: '2023-04-25 18:58:13.469',
last_timestamp: '2023-04-25 18:58:13.469',
active_milliseconds: 1, // one event, but it's active, so active time is 1ms not 0
},
},
{
snapshotData: {
events_summary: [
{
timestamp: 1682449093693,
type: 5,
data: {
payload: {
// doesn't match because href is nested in payload
href: 'http://127.0.0.1:8000/home',
},
},
windowId: '1',
},
{
timestamp: 1682449093469,
type: 4,
data: {
href: 'http://127.0.0.1:8000/second/url',
},
windowId: '1',
},
],
},
expected: {
click_count: 0,
keypress_count: 0,
mouse_activity_count: 0,
first_url: 'http://127.0.0.1:8000/second/url',
first_timestamp: '2023-04-25 18:58:13.469',
last_timestamp: '2023-04-25 18:58:13.693',
active_milliseconds: 0, // no data.source, so no activity
},
},
{
snapshotData: {
events_summary: [
// three windows with 1 second, 2 seconds, and 3 seconds of activity
// even though they overlap they should be summed separately
{ timestamp: 1682449093000, type: 3, data: { source: 2 }, windowId: '1' },
{ timestamp: 1682449094000, type: 3, data: { source: 2 }, windowId: '1' },
{ timestamp: 1682449095000, type: 3, data: { source: 2 }, windowId: '2' },
{ timestamp: 1682449097000, type: 3, data: { source: 2 }, windowId: '2' },
{ timestamp: 1682449096000, type: 3, data: { source: 2 }, windowId: '3' },
{ timestamp: 1682449099000, type: 3, data: { source: 2 }, windowId: '3' },
],
},
expected: {
click_count: 6,
keypress_count: 0,
mouse_activity_count: 6,
first_url: undefined,
first_timestamp: '2023-04-25 18:58:13.000',
last_timestamp: '2023-04-25 18:58:19.000',
active_milliseconds: 6000, // can sum up the activity across windows
},
},
]
sessionReplayEventTestCases.forEach(({ snapshotData, expected }) => {
test(`snapshot event ${JSON.stringify(snapshotData)} can be stored as session_replay_event`, () => {
const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
$session_id: 'abcf-efg',
$snapshot_data: snapshotData,
} as any as Properties)
const expectedEvent: SummarizedSessionRecordingEvent = {
distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4',
session_id: 'abcf-efg',
team_id: 2,
uuid: 'some-id',
...expected,
}
expect(data).toEqual(expectedEvent)
})
})
test(`snapshot event with no event summary is ignored`, () => {
const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
$session_id: 'abcf-efg',
$snapshot_data: {},
} as any as Properties)
expect(data).toEqual(null)
})
test(`snapshot event with no event summary timestamps is ignored`, () => {
const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
$session_id: 'abcf-efg',
$snapshot_data: {
events_summary: [
{
type: 5,
data: {
payload: {
// doesn't match because href is nested in payload
href: 'http://127.0.0.1:8000/home',
},
},
},
{
type: 4,
data: {
href: 'http://127.0.0.1:8000/second/url',
},
},
],
},
} as any as Properties)
expect(data).toEqual(null)
})
test('performance event stored as performance_event', () => {
const data = createPerformanceEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', {

View File

@ -0,0 +1,17 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.session_replay_event.sql import (
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
operations = [
run_sql_with_exceptions(WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
]

View File

@ -28,6 +28,12 @@ from posthog.models.person_overrides.sql import (
PERSON_OVERRIDES_CREATE_TABLE_SQL,
)
from posthog.models.session_recording_event.sql import *
from posthog.models.session_replay_event.sql import (
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
)
CREATE_MERGETREE_TABLE_QUERIES = (
CREATE_COHORTPEOPLE_TABLE_SQL,
@ -44,6 +50,7 @@ CREATE_MERGETREE_TABLE_QUERIES = (
INGESTION_WARNINGS_DATA_TABLE_SQL,
APP_METRICS_DATA_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_SQL,
)
CREATE_DISTRIBUTED_TABLE_QUERIES = (
WRITABLE_EVENTS_TABLE_SQL,
@ -54,6 +61,7 @@ CREATE_DISTRIBUTED_TABLE_QUERIES = (
DISTRIBUTED_APP_METRICS_TABLE_SQL,
WRITABLE_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
CREATE_KAFKA_TABLE_QUERIES = (
KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL,
@ -68,6 +76,7 @@ CREATE_KAFKA_TABLE_QUERIES = (
KAFKA_INGESTION_WARNINGS_TABLE_SQL,
KAFKA_APP_METRICS_TABLE_SQL,
KAFKA_PERFORMANCE_EVENTS_TABLE_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
CREATE_MV_TABLE_QUERIES = (
DEAD_LETTER_QUEUE_TABLE_MV_SQL,
@ -82,6 +91,7 @@ CREATE_MV_TABLE_QUERIES = (
INGESTION_WARNINGS_MV_TABLE_SQL,
APP_METRICS_MV_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_MV_SQL,
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
)
CREATE_TABLE_QUERIES = (

View File

@ -318,6 +318,25 @@
'
---
# name: test_create_kafka_table_with_different_kafka_host[kafka_session_replay_events]
'
CREATE TABLE IF NOT EXISTS kafka_session_replay_events ON CLUSTER 'posthog'
(
session_id VARCHAR,
team_id Int64,
distinct_id VARCHAR,
first_timestamp DateTime64(6, 'UTC'),
last_timestamp DateTime64(6, 'UTC'),
first_url Nullable(VARCHAR),
click_count Int64,
keypress_count Int64,
mouse_activity_count Int64,
active_milliseconds Int64
) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')
'
---
# name: test_create_table_query[app_metrics]
'
@ -880,6 +899,25 @@
'
---
# name: test_create_table_query[kafka_session_replay_events]
'
CREATE TABLE IF NOT EXISTS kafka_session_replay_events ON CLUSTER 'posthog'
(
session_id VARCHAR,
team_id Int64,
distinct_id VARCHAR,
first_timestamp DateTime64(6, 'UTC'),
last_timestamp DateTime64(6, 'UTC'),
first_url Nullable(VARCHAR),
click_count Int64,
keypress_count Int64,
mouse_activity_count Int64,
active_milliseconds Int64
) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')
'
---
# name: test_create_table_query[performance_events]
'
@ -1273,6 +1311,60 @@
'
---
# name: test_create_table_query[session_replay_events]
'
CREATE TABLE IF NOT EXISTS session_replay_events ON CLUSTER 'posthog'
(
-- part of order by so will aggregate correctly
session_id VARCHAR,
-- part of order by so will aggregate correctly
team_id Int64,
-- ClickHouse will pick any value of distinct_id for the session
-- this is fine since even if the distinct_id changes during a session
-- it will still (or should still) map to the same person
distinct_id VARCHAR,
min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
click_count SimpleAggregateFunction(sum, Int64),
keypress_count SimpleAggregateFunction(sum, Int64),
mouse_activity_count SimpleAggregateFunction(sum, Int64),
active_milliseconds SimpleAggregateFunction(sum, Int64)
) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events', sipHash64(distinct_id))
'
---
# name: test_create_table_query[session_replay_events_mv]
'
CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_mv ON CLUSTER 'posthog'
TO posthog_test.writable_session_replay_events
AS SELECT
session_id,
team_id,
any(distinct_id) as distinct_id,
min(first_timestamp) AS min_first_timestamp,
max(last_timestamp) AS max_last_timestamp,
-- TRICKY: ClickHouse will pick a relatively random first_url
-- when it collapses the aggregating merge tree
-- unless we teach it what we want...
-- argMin ignores null values
-- so this will get the first non-null value of first_url
-- for each group of session_id and team_id
-- by min of first_timestamp in the batch
-- this is an aggregate function, not a simple aggregate function
-- so we have to write to argMinState, and query with argMinMerge
argMinState(first_url, first_timestamp) as first_url,
sum(click_count) as click_count,
sum(keypress_count) as keypress_count,
sum(mouse_activity_count) as mouse_activity_count,
sum(active_milliseconds) as active_milliseconds
FROM posthog_test.kafka_session_replay_events
group by session_id, team_id
'
---
# name: test_create_table_query[sharded_app_metrics]
'
@ -1475,6 +1567,44 @@
'
---
# name: test_create_table_query[sharded_session_replay_events]
'
CREATE TABLE IF NOT EXISTS sharded_session_replay_events ON CLUSTER 'posthog'
(
-- part of order by so will aggregate correctly
session_id VARCHAR,
-- part of order by so will aggregate correctly
team_id Int64,
-- ClickHouse will pick any value of distinct_id for the session
-- this is fine since even if the distinct_id changes during a session
-- it will still (or should still) map to the same person
distinct_id VARCHAR,
min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
click_count SimpleAggregateFunction(sum, Int64),
keypress_count SimpleAggregateFunction(sum, Int64),
mouse_activity_count SimpleAggregateFunction(sum, Int64),
active_milliseconds SimpleAggregateFunction(sum, Int64)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')
PARTITION BY toYYYYMM(min_first_timestamp)
-- order by is used by the aggregating merge tree engine to
-- identify candidates to merge, e.g. toDate(min_first_timestamp)
-- would mean we would have one row per day per session_id
-- if CH could completely merge to match the order by
-- it is also used to organise data to make queries faster
-- we want the fewest rows possible but also the fastest queries
-- since we query by date and not by time
-- and order by must be in order of increasing cardinality
-- so we order by date first, then team_id, then session_id
-- hopefully, this is a good balance between the two
ORDER BY (toDate(min_first_timestamp), team_id, session_id)
SETTINGS index_granularity=512
'
---
# name: test_create_table_query[writable_events]
'
@ -2051,3 +2181,41 @@
'
---
# name: test_create_table_query_replicated_and_storage[sharded_session_replay_events]
'
CREATE TABLE IF NOT EXISTS sharded_session_replay_events ON CLUSTER 'posthog'
(
-- part of order by so will aggregate correctly
session_id VARCHAR,
-- part of order by so will aggregate correctly
team_id Int64,
-- ClickHouse will pick any value of distinct_id for the session
-- this is fine since even if the distinct_id changes during a session
-- it will still (or should still) map to the same person
distinct_id VARCHAR,
min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
click_count SimpleAggregateFunction(sum, Int64),
keypress_count SimpleAggregateFunction(sum, Int64),
mouse_activity_count SimpleAggregateFunction(sum, Int64),
active_milliseconds SimpleAggregateFunction(sum, Int64)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')
PARTITION BY toYYYYMM(min_first_timestamp)
-- order by is used by the aggregating merge tree engine to
-- identify candidates to merge, e.g. toDate(min_first_timestamp)
-- would mean we would have one row per day per session_id
-- if CH could completely merge to match the order by
-- it is also used to organise data to make queries faster
-- we want the fewest rows possible but also the fastest queries
-- since we query by date and not by time
-- and order by must be in order of increasing cardinality
-- so we order by date first, then team_id, then session_id
-- hopefully, this is a good balance between the two
ORDER BY (toDate(min_first_timestamp), team_id, session_id)
SETTINGS index_granularity=512
'
---

View File

@ -9,6 +9,7 @@ KAFKA_PERSON = f"{KAFKA_PREFIX}clickhouse_person{SUFFIX}"
KAFKA_PERSON_UNIQUE_ID = f"{KAFKA_PREFIX}clickhouse_person_unique_id{SUFFIX}" # DEPRECATED_DO_NOT_USE
KAFKA_PERSON_DISTINCT_ID = f"{KAFKA_PREFIX}clickhouse_person_distinct_id{SUFFIX}"
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}"
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}"
KAFKA_PERFORMANCE_EVENTS = f"{KAFKA_PREFIX}clickhouse_performance_events{SUFFIX}"
KAFKA_PLUGIN_LOG_ENTRIES = f"{KAFKA_PREFIX}plugin_log_entries{SUFFIX}"
KAFKA_DEAD_LETTER_QUEUE = f"{KAFKA_PREFIX}events_dead_letter_queue{SUFFIX}"

View File

@ -0,0 +1,136 @@
from django.conf import settings
from posthog.clickhouse.kafka_engine import kafka_engine
from posthog.clickhouse.table_engines import Distributed, ReplicationScheme, AggregatingMergeTree
from posthog.kafka_client.topics import KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS
SESSION_REPLAY_EVENTS_DATA_TABLE = lambda: "sharded_session_replay_events"
"""
Kafka needs slightly different column setup. It receives individual events, not aggregates.
We write first_timestamp and last_timestamp as individual records
They will be grouped as min_first_timestamp and max_last_timestamp in the main table
"""
KAFKA_SESSION_REPLAY_EVENTS_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
(
session_id VARCHAR,
team_id Int64,
distinct_id VARCHAR,
first_timestamp DateTime64(6, 'UTC'),
last_timestamp DateTime64(6, 'UTC'),
first_url Nullable(VARCHAR),
click_count Int64,
keypress_count Int64,
mouse_activity_count Int64,
active_milliseconds Int64
) ENGINE = {engine}
"""
SESSION_REPLAY_EVENTS_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
(
-- part of order by so will aggregate correctly
session_id VARCHAR,
-- part of order by so will aggregate correctly
team_id Int64,
-- ClickHouse will pick any value of distinct_id for the session
-- this is fine since even if the distinct_id changes during a session
-- it will still (or should still) map to the same person
distinct_id VARCHAR,
min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')),
max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')),
first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')),
click_count SimpleAggregateFunction(sum, Int64),
keypress_count SimpleAggregateFunction(sum, Int64),
mouse_activity_count SimpleAggregateFunction(sum, Int64),
active_milliseconds SimpleAggregateFunction(sum, Int64)
) ENGINE = {engine}
"""
SESSION_REPLAY_EVENTS_DATA_TABLE_ENGINE = lambda: AggregatingMergeTree(
"session_replay_events", replication_scheme=ReplicationScheme.SHARDED
)
SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: (
SESSION_REPLAY_EVENTS_TABLE_BASE_SQL
+ """
PARTITION BY toYYYYMM(min_first_timestamp)
-- order by is used by the aggregating merge tree engine to
-- identify candidates to merge, e.g. toDate(min_first_timestamp)
-- would mean we would have one row per day per session_id
-- if CH could completely merge to match the order by
-- it is also used to organise data to make queries faster
-- we want the fewest rows possible but also the fastest queries
-- since we query by date and not by time
-- and order by must be in order of increasing cardinality
-- so we order by date first, then team_id, then session_id
-- hopefully, this is a good balance between the two
ORDER BY (toDate(min_first_timestamp), team_id, session_id)
SETTINGS index_granularity=512
"""
).format(
table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
engine=SESSION_REPLAY_EVENTS_DATA_TABLE_ENGINE(),
)
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: KAFKA_SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format(
table_name="kafka_session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=kafka_engine(topic=KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS),
)
SESSION_REPLAY_EVENTS_TABLE_MV_SQL = lambda: """
CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_mv ON CLUSTER '{cluster}'
TO {database}.{target_table}
AS SELECT
session_id,
team_id,
any(distinct_id) as distinct_id,
min(first_timestamp) AS min_first_timestamp,
max(last_timestamp) AS max_last_timestamp,
-- TRICKY: ClickHouse will pick a relatively random first_url
-- when it collapses the aggregating merge tree
-- unless we teach it what we want...
-- argMin ignores null values
-- so this will get the first non-null value of first_url
-- for each group of session_id and team_id
-- by min of first_timestamp in the batch
-- this is an aggregate function, not a simple aggregate function
-- so we have to write to argMinState, and query with argMinMerge
argMinState(first_url, first_timestamp) as first_url,
sum(click_count) as click_count,
sum(keypress_count) as keypress_count,
sum(mouse_activity_count) as mouse_activity_count,
sum(active_milliseconds) as active_milliseconds
FROM {database}.kafka_session_replay_events
group by session_id, team_id
""".format(
target_table="writable_session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
database=settings.CLICKHOUSE_DATABASE,
)
# Distributed engine tables are only created if CLICKHOUSE_REPLICATED
# This table is responsible for writing to sharded_session_replay_events based on a sharding key.
WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format(
table_name="writable_session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=SESSION_REPLAY_EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
)
# This table is responsible for reading from session_replay_events on a cluster setting
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: SESSION_REPLAY_EVENTS_TABLE_BASE_SQL.format(
table_name="session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=SESSION_REPLAY_EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
)
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: (
f"DROP TABLE IF EXISTS {SESSION_REPLAY_EVENTS_DATA_TABLE()} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'"
)

View File

@ -0,0 +1,119 @@
from datetime import datetime
from typing import Optional
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from posthog.kafka_client.client import ClickhouseProducer
from posthog.kafka_client.topics import KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS
from posthog.models.event.util import format_clickhouse_timestamp
from posthog.utils import cast_timestamp_or_now
INSERT_SINGLE_SESSION_REPLAY = """
INSERT INTO sharded_session_replay_events (
session_id,
team_id,
distinct_id,
min_first_timestamp,
max_last_timestamp,
first_url,
click_count,
keypress_count,
mouse_activity_count,
active_milliseconds
)
SELECT
%(session_id)s,
%(team_id)s,
%(distinct_id)s,
toDateTime64(%(first_timestamp)s, 6, 'UTC'),
toDateTime64(%(last_timestamp)s, 6, 'UTC'),
argMinState(cast(%(first_url)s, 'Nullable(String)'), toDateTime64(%(first_timestamp)s, 6, 'UTC')),
%(click_count)s,
%(keypress_count)s,
%(mouse_activity_count)s,
%(active_milliseconds)s
"""
def _sensible_first_timestamp(
first_timestamp: Optional[str | datetime], last_timestamp: Optional[str | datetime]
) -> str:
"""
Normalise the first timestamp to be used in the session replay summary.
If it is not provided but there is a last_timestamp, use an hour before that last_timestamp
Otherwise we use the current time
"""
sensible_timestamp = None
if first_timestamp is not None:
# TRICKY: check it not a string to avoid needing to check if it is a datetime or a Fakedatetime
if not isinstance(first_timestamp, str):
sensible_timestamp = first_timestamp.isoformat()
else:
sensible_timestamp = first_timestamp
else:
if last_timestamp is not None:
if isinstance(last_timestamp, str):
last_timestamp = parse(last_timestamp)
sensible_timestamp = (last_timestamp - relativedelta(seconds=3600)).isoformat()
return format_clickhouse_timestamp(cast_timestamp_or_now(sensible_timestamp))
def _sensible_last_timestamp(
first_timestamp: Optional[str | datetime], last_timestamp: Optional[str | datetime]
) -> str:
"""
Normalise the last timestamp to be used in the session replay summary.
If it is not provided but there is a first_timestamp, use an hour after that last_timestamp
Otherwise we use the current time
"""
sensible_timestamp = None
if last_timestamp is not None:
# TRICKY: check it not a string to avoid needing to check if it is a datetime or a Fakedatetime
if not isinstance(last_timestamp, str):
sensible_timestamp = last_timestamp.isoformat()
else:
sensible_timestamp = last_timestamp
else:
if first_timestamp is not None:
if isinstance(first_timestamp, str):
first_timestamp = parse(first_timestamp)
sensible_timestamp = (first_timestamp - relativedelta(seconds=3600)).isoformat()
return format_clickhouse_timestamp(cast_timestamp_or_now(sensible_timestamp))
def produce_replay_summary(
team_id: int,
session_id: Optional[str] = None,
distinct_id: Optional[str] = None,
first_timestamp: Optional[str | datetime] = None,
last_timestamp: Optional[str | datetime] = None,
first_url: Optional[str | None] = None,
click_count: Optional[int] = None,
keypress_count: Optional[int] = None,
mouse_activity_count: Optional[int] = None,
active_milliseconds: Optional[float] = None,
):
first_timestamp = _sensible_first_timestamp(first_timestamp, last_timestamp)
last_timestamp = _sensible_last_timestamp(first_timestamp, last_timestamp)
data = {
"session_id": session_id or "1",
"team_id": team_id,
"distinct_id": distinct_id or "user",
"first_timestamp": format_clickhouse_timestamp(cast_timestamp_or_now(first_timestamp)),
"last_timestamp": format_clickhouse_timestamp(cast_timestamp_or_now(last_timestamp)),
"first_url": first_url,
"click_count": click_count or 0,
"keypress_count": keypress_count or 0,
"mouse_activity_count": mouse_activity_count or 0,
"active_milliseconds": active_milliseconds or 0,
}
p = ClickhouseProducer()
# because this is in a test it will write directly using SQL not really with Kafka
p.produce(topic=KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, sql=INSERT_SINGLE_SESSION_REPLAY, data=data)

View File

@ -0,0 +1,157 @@
from datetime import datetime, timedelta
from uuid import uuid4
import pytz
from dateutil.parser import isoparse
from posthog.clickhouse.client import sync_execute
from posthog.models import Team
from posthog.models.event.util import format_clickhouse_timestamp
from posthog.queries.app_metrics.serializers import AppMetricsRequestSerializer
from posthog.queries.session_recordings.test.session_replay_sql import produce_replay_summary
from posthog.test.base import BaseTest, ClickhouseTestMixin, snapshot_clickhouse_queries
def make_filter(serializer_klass=AppMetricsRequestSerializer, **kwargs) -> AppMetricsRequestSerializer:
filter = serializer_klass(data=kwargs)
filter.is_valid(raise_exception=True)
return filter
class SessionReplaySummaryQuery:
def __init__(self, team: Team, session_id: str, reference_date: str):
self.team = team
self.session_id = session_id
self.reference_date = reference_date
def list_all(self):
params = {
"team_id": self.team.pk,
"start_time": format_clickhouse_timestamp(isoparse(self.reference_date) - timedelta(hours=48)),
"end_time": format_clickhouse_timestamp(isoparse(self.reference_date) + timedelta(hours=48)),
"session_ids": (self.session_id,),
}
results = sync_execute(
"""
select
session_id,
any(team_id),
any(distinct_id),
min(min_first_timestamp),
max(max_last_timestamp),
dateDiff('SECOND', min(min_first_timestamp), max(max_last_timestamp)) as duration,
argMinMerge(first_url) as first_url,
sum(click_count),
sum(keypress_count),
sum(mouse_activity_count),
round((sum(active_milliseconds)/1000)/duration, 2) as active_time
from session_replay_events
prewhere team_id = %(team_id)s
and min_first_timestamp >= %(start_time)s
and max_last_timestamp <= %(end_time)s
and session_id in %(session_ids)s
group by session_id
""",
params,
)
return results
class TestReceiveSummarizedSessionReplays(ClickhouseTestMixin, BaseTest):
@snapshot_clickhouse_queries
def test_session_replay_summaries_can_be_queried(self):
session_id = str(uuid4())
produce_replay_summary(
session_id=session_id,
team_id=self.team.pk,
# can CH handle a timestamp with no T
first_timestamp="2023-04-27 10:00:00.309",
last_timestamp="2023-04-27 14:20:42.237",
distinct_id=str(self.user.distinct_id),
first_url="https://first-url-ingested.com",
click_count=2,
keypress_count=2,
mouse_activity_count=2,
active_milliseconds=33624 * 1000 * 0.3, # 30% of the total expected duration
)
produce_replay_summary(
session_id=session_id,
team_id=self.team.pk,
first_timestamp="2023-04-27T19:17:38.116",
last_timestamp="2023-04-27T19:17:38.117",
distinct_id=str(self.user.distinct_id),
first_url="https://second-url-ingested.com",
click_count=2,
keypress_count=2,
mouse_activity_count=2,
)
produce_replay_summary(
session_id=session_id,
team_id=self.team.pk,
first_timestamp="2023-04-27T19:18:24.597",
last_timestamp="2023-04-27T19:20:24.597",
distinct_id=str(self.user.distinct_id),
first_url="https://third-url-ingested.com",
click_count=2,
keypress_count=2,
mouse_activity_count=2,
)
# same session but ends more than 2 days from start so excluded
produce_replay_summary(
session_id=session_id,
team_id=self.team.pk,
first_timestamp="2023-04-26T19:18:24.597",
last_timestamp="2023-04-29T19:20:24.597",
distinct_id=str(self.user.distinct_id),
first_url=None,
click_count=2,
keypress_count=2,
mouse_activity_count=2,
)
# same session but a different team so excluded
produce_replay_summary(
session_id=session_id,
team_id=self.team.pk + 100,
first_timestamp="2023-04-26T19:18:24.597",
last_timestamp="2023-04-28T19:20:24.597",
distinct_id=str(self.user.distinct_id),
first_url=None,
click_count=2,
keypress_count=2,
mouse_activity_count=2,
)
# different session so excluded
produce_replay_summary(
session_id=str(uuid4()),
team_id=self.team.pk,
first_timestamp="2023-04-26T19:18:24.597",
last_timestamp="2023-04-26T19:20:24.597",
distinct_id=str(self.user.distinct_id),
first_url=None,
click_count=2,
keypress_count=2,
mouse_activity_count=2,
)
results = SessionReplaySummaryQuery(self.team, session_id, "2023-04-26T19:18:24.597").list_all()
assert results == [
(
session_id,
self.team.pk,
str(self.user.distinct_id),
datetime(2023, 4, 27, 10, 0, 0, 309000, tzinfo=pytz.UTC),
datetime(2023, 4, 27, 19, 20, 24, 597000, tzinfo=pytz.UTC),
33624,
"https://first-url-ingested.com",
6,
6,
6,
0.3,
)
]

View File

@ -42,6 +42,11 @@ from posthog.models.session_recording_event.sql import (
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL,
SESSION_RECORDING_EVENTS_TABLE_SQL,
)
from posthog.models.session_replay_event.sql import (
SESSION_REPLAY_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
from posthog.settings.utils import get_from_env, str_to_bool
persons_cache_tests: List[Dict[str, Any]] = []
@ -660,6 +665,7 @@ class ClickhouseDestroyTablesMixin(BaseTest):
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID2_TABLE_SQL,
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
TRUNCATE_GROUPS_TABLE_SQL,
TRUNCATE_COHORTPEOPLE_TABLE_SQL,
TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL,
@ -667,10 +673,19 @@ class ClickhouseDestroyTablesMixin(BaseTest):
]
)
run_clickhouse_statement_in_parallel(
[EVENTS_TABLE_SQL(), PERSONS_TABLE_SQL(), SESSION_RECORDING_EVENTS_TABLE_SQL()]
[
EVENTS_TABLE_SQL(),
PERSONS_TABLE_SQL(),
SESSION_RECORDING_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[DISTRIBUTED_EVENTS_TABLE_SQL(), DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL()]
[
DISTRIBUTED_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
]
)
def tearDown(self):
@ -682,13 +697,24 @@ class ClickhouseDestroyTablesMixin(BaseTest):
DROP_PERSON_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
DROP_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DROP_SESSION_REPLAY_EVENTS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[
EVENTS_TABLE_SQL(),
PERSONS_TABLE_SQL(),
SESSION_RECORDING_EVENTS_TABLE_SQL(),
SESSION_REPLAY_EVENTS_TABLE_SQL(),
]
)
run_clickhouse_statement_in_parallel(
[EVENTS_TABLE_SQL(), PERSONS_TABLE_SQL(), SESSION_RECORDING_EVENTS_TABLE_SQL()]
)
run_clickhouse_statement_in_parallel(
[DISTRIBUTED_EVENTS_TABLE_SQL(), DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL()]
[
DISTRIBUTED_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL(),
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL(),
]
)