From d8df34f4abda17ca331f4df3475ac5d6b70ca963 Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 20 Jul 2023 16:41:25 +0200 Subject: [PATCH] feat: Replay events consumer (#16642) --- .run/Plugin Server.run.xml | 1 - .run/PostHog.run.xml | 1 - .vscode/launch.json | 2 +- .../session-recordings-blob-ingestion.test.ts | 135 ------------- plugin-server/src/config/config.ts | 5 +- plugin-server/src/kafka/consumer.ts | 42 ++-- .../session-recording/blob-ingester/utils.ts | 13 -- .../offset-high-water-marker.ts} | 64 +++--- .../realtime-manager.ts | 4 +- .../services/replay-events-ingester.ts | 187 ++++++++++++++++++ .../session-manager.ts | 33 ++-- ...r.ts => session-recordings-consumer-v1.ts} | 22 ++- ...r.ts => session-recordings-consumer-v2.ts} | 122 +++++------- .../session-recording/snapshot-segmenter.ts | 24 +-- .../{blob-ingester => }/types.ts | 15 +- .../session-recording/utils.ts | 26 +++ plugin-server/src/main/pluginsServer.ts | 21 +- plugin-server/src/types.ts | 7 +- .../src/worker/ingestion/process-event.ts | 56 ++---- .../session-recording/fixtures.ts | 3 +- .../offset-high-water-mark.test.ts} | 42 ++-- .../session-manager.test.ts | 4 +- ...=> session-recordings-consumer-v1.test.ts} | 30 ++- ...ecordings-consumer-v2-rebalancing.test.ts} | 12 +- ...=> session-recordings-consumer-v2.test.ts} | 6 +- .../tests/main/process-event.test.ts | 77 ++++---- posthog/api/capture.py | 13 +- .../api/test/__snapshots__/test_cohort.ambr | 16 +- posthog/api/test/test_capture.py | 61 ++---- posthog/settings/ingestion.py | 8 +- 30 files changed, 548 insertions(+), 504 deletions(-) delete mode 100644 plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts delete mode 100644 plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/utils.ts rename plugin-server/src/main/ingestion-queues/session-recording/{blob-ingester/session-offset-high-water-mark.ts => services/offset-high-water-marker.ts} (72%) rename plugin-server/src/main/ingestion-queues/session-recording/{blob-ingester => services}/realtime-manager.ts (98%) create mode 100644 plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts rename plugin-server/src/main/ingestion-queues/session-recording/{blob-ingester => services}/session-manager.ts (94%) rename plugin-server/src/main/ingestion-queues/session-recording/{session-recordings-consumer.ts => session-recordings-consumer-v1.ts} (95%) rename plugin-server/src/main/ingestion-queues/session-recording/{session-recordings-blob-consumer.ts => session-recordings-consumer-v2.ts} (85%) rename plugin-server/src/main/ingestion-queues/session-recording/{blob-ingester => }/types.ts (58%) create mode 100644 plugin-server/src/main/ingestion-queues/session-recording/utils.ts rename plugin-server/tests/main/ingestion-queues/session-recording/{blob-ingester/session-offset-high-water-mark.test.ts => services/offset-high-water-mark.test.ts} (73%) rename plugin-server/tests/main/ingestion-queues/session-recording/{blob-ingester => services}/session-manager.test.ts (99%) rename plugin-server/tests/main/ingestion-queues/session-recording/{session-recordings-consumer.test.ts => session-recordings-consumer-v1.test.ts} (85%) rename plugin-server/tests/main/ingestion-queues/session-recording/{session-recordings-blob-consumer-rebalancing.test.ts => session-recordings-consumer-v2-rebalancing.test.ts} (81%) rename plugin-server/tests/main/ingestion-queues/session-recording/{session-recordings-blob-consumer.test.ts => session-recordings-consumer-v2.test.ts} (97%) diff --git a/.run/Plugin Server.run.xml b/.run/Plugin Server.run.xml index 4ec58fe1f43..5160d9a7228 100644 --- a/.run/Plugin Server.run.xml +++ b/.run/Plugin Server.run.xml @@ -11,7 +11,6 @@ - diff --git a/.run/PostHog.run.xml b/.run/PostHog.run.xml index ed075e33cff..fe267209927 100644 --- a/.run/PostHog.run.xml +++ b/.run/PostHog.run.xml @@ -14,7 +14,6 @@ - diff --git a/.vscode/launch.json b/.vscode/launch.json index 3c5d8f5356d..23c945f6cfb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -70,7 +70,7 @@ "DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog", "SKIP_SERVICE_VERSION_REQUIREMENTS": "1", "PRINT_SQL": "1", - "REPLAY_BLOB_INGESTION_TRAFFIC_RATIO": "1.0" + "REPLAY_EVENTS_NEW_CONSUMER_RATIO": "1.0" }, "console": "integratedTerminal", "python": "${workspaceFolder}/env/bin/python", diff --git a/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts b/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts deleted file mode 100644 index 2ad539c3410..00000000000 --- a/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts +++ /dev/null @@ -1,135 +0,0 @@ -import { GetObjectCommand, GetObjectCommandOutput, ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3' -import { readdir, readFile } from 'fs/promises' -import { join } from 'path' -import * as zlib from 'zlib' - -import { defaultConfig } from '../src/config/config' -import { compressToString } from '../src/main/ingestion-queues/session-recording/blob-ingester/utils' -import { bufferFileDir } from '../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer' -import { getObjectStorage } from '../src/main/services/object_storage' -import { UUIDT } from '../src/utils/utils' -import { capture, createOrganization, createTeam } from './api' -import { waitForExpect } from './expectations' - -let organizationId: string - -let s3: S3Client - -function generateVeryLongString(length = 1025) { - return [...Array(length)].map(() => Math.random().toString(36)[2]).join('') -} - -beforeAll(async () => { - organizationId = await createOrganization() - - const objectStorage = getObjectStorage({ - OBJECT_STORAGE_ENDPOINT: defaultConfig.OBJECT_STORAGE_ENDPOINT, - OBJECT_STORAGE_REGION: defaultConfig.OBJECT_STORAGE_REGION, - OBJECT_STORAGE_ACCESS_KEY_ID: defaultConfig.OBJECT_STORAGE_ACCESS_KEY_ID, - OBJECT_STORAGE_SECRET_ACCESS_KEY: defaultConfig.OBJECT_STORAGE_SECRET_ACCESS_KEY, - OBJECT_STORAGE_ENABLED: defaultConfig.OBJECT_STORAGE_ENABLED, - OBJECT_STORAGE_BUCKET: defaultConfig.OBJECT_STORAGE_BUCKET, - }) - if (!objectStorage) { - throw new Error('S3 not configured') - } - s3 = objectStorage.s3 -}) - -// having these tests is causing flapping failures in other tests :/ -// eg.https://github.com/PostHog/posthog/actions/runs/4802953306/jobs/8553849494 -test.skip(`single recording event writes data to local tmp file`, async () => { - const teamId = await createTeam(organizationId) - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() - const sessionId = new UUIDT().toString() - const veryLongString = generateVeryLongString() - await capture({ - teamId, - distinctId, - uuid, - event: '$snapshot', - properties: { - $session_id: sessionId, - $window_id: 'abc1234', - $snapshot_data: { data: compressToString(veryLongString), chunk_count: 1 }, - }, - topic: 'session_recording_events', - }) - - let tempFiles: string[] = [] - - await waitForExpect(async () => { - const files = await readdir(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY)) - tempFiles = files.filter((f) => f.startsWith(`${teamId}.${sessionId}`)) - expect(tempFiles.length).toBe(1) - }) - - await waitForExpect(async () => { - const currentFile = tempFiles[0] - - const fileContents = await readFile( - join(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY), currentFile), - 'utf8' - ) - - expect(fileContents).toEqual(`{"window_id":"abc1234","data":"${veryLongString}"}\n`) - }) -}, 40000) - -test.skip(`multiple recording events writes compressed data to s3`, async () => { - const teamId = await createTeam(organizationId) - const distinctId = new UUIDT().toString() - const sessionId = new UUIDT().toString() - - // need to send enough data to trigger the s3 upload exactly once. - // with a buffer of 1024, an estimated gzip compression of 0.1, and 1025 default length for generateAVeryLongString - // we need 25,000 events. - // if any of those things change then the number of events probably needs to change too - const captures = Array.from({ length: 25000 }).map(() => { - return capture({ - teamId, - distinctId, - uuid: new UUIDT().toString(), - event: '$snapshot', - properties: { - $session_id: sessionId, - $window_id: 'abc1234', - $snapshot_data: { data: compressToString(generateVeryLongString()), chunk_count: 1 }, - }, - topic: 'session_recording_events', - }) - }) - await Promise.all(captures) - - await waitForExpect(async () => { - const s3Files = await s3.send( - new ListObjectsV2Command({ - Bucket: defaultConfig.OBJECT_STORAGE_BUCKET, - Prefix: `${defaultConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${teamId}/session_id/${sessionId}`, - }) - ) - expect(s3Files.Contents?.length).toBeGreaterThanOrEqual(1) - - const s3File = s3Files.Contents?.[0] - if (!s3File) { - throw new Error('No s3File') - } - const s3FileContents: GetObjectCommandOutput = await s3.send( - new GetObjectCommand({ - Bucket: defaultConfig.OBJECT_STORAGE_BUCKET, - Key: s3File.Key, - }) - ) - const fileStream = await s3FileContents.Body?.transformToByteArray() - if (!fileStream) { - throw new Error('No fileStream') - } - const text = zlib.gunzipSync(fileStream).toString().trim() - // text contains JSON for { - // "window_id": "abc1234", - // "data": "random...string" // thousands of characters - // } - expect(text).toMatch(/{"window_id":"abc1234","data":"\w+"}/) - }, 40000) -}, 50000) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 8d1fefd9509..45fee373aeb 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -122,9 +122,6 @@ export function getDefaultConfig(): PluginsServerConfig { USE_KAFKA_FOR_SCHEDULED_TASKS: true, CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag - // this defaults to true, but is used to disable for testing in production - SESSION_RECORDING_ENABLE_OFFSET_HIGH_WATER_MARK_PROCESSING: true, - SESSION_RECORDING_KAFKA_HOSTS: undefined, SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: undefined, SESSION_RECORDING_KAFKA_BATCH_SIZE: 500, @@ -143,7 +140,7 @@ export function getDefaultConfig(): PluginsServerConfig { } } -export const sessionRecordingBlobConsumerConfig = (config: PluginsServerConfig): PluginsServerConfig => { +export const sessionRecordingConsumerConfig = (config: PluginsServerConfig): PluginsServerConfig => { // When running the blob consumer we override a bunch of settings to use the session recording ones if available return { ...config, diff --git a/plugin-server/src/kafka/consumer.ts b/plugin-server/src/kafka/consumer.ts index 9de555e117a..f3b3a91d2be 100644 --- a/plugin-server/src/kafka/consumer.ts +++ b/plugin-server/src/kafka/consumer.ts @@ -135,38 +135,54 @@ export const consumeMessages = async (consumer: RdKafkaConsumer, fetchBatchSize: }) }) } -export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => { - // Get the offsets for the last message for each partition, from - // messages - const offsets = messages.reduce((acc, message) => { + +export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPartitionOffset[] => { + // We only need to commit the highest offset for a batch of messages + const messagesByTopicPartition = messages.reduce((acc, message) => { if (!acc[message.topic]) { acc[message.topic] = {} } - if (!acc[message.topic][message.partition.toString()]) { - acc[message.topic][message.partition.toString()] = message.offset - } else if (message.offset > acc[message.topic][message.partition.toString()]) { - acc[message.topic][message.partition.toString()] = message.offset + if (!acc[message.topic][message.partition]) { + acc[message.topic][message.partition] = [] } - return acc - }, {} as { [topic: string]: { [partition: string]: number } }) + acc[message.topic][message.partition].push(message) + + return acc + }, {} as { [topic: string]: { [partition: number]: TopicPartitionOffset[] } }) + + // Then we find the highest offset for each topic partition + const highestOffsets = Object.entries(messagesByTopicPartition).flatMap(([topic, partitions]) => { + return Object.entries(partitions).map(([partition, messages]) => { + const highestOffset = Math.max(...messages.map((message) => message.offset)) - const topicPartitionOffsets = Object.entries(offsets).flatMap(([topic, partitions]) => { - return Object.entries(partitions).map(([partition, offset]) => { return { topic, partition: parseInt(partition), - offset: offset + 1, + offset: highestOffset, } }) }) + return highestOffsets +} + +export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => { + const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => { + return { + ...message, + // When committing to Kafka you commit the offset of the next message you want to consume + offset: message.offset + 1, + } + }) + if (topicPartitionOffsets.length > 0) { status.debug('๐Ÿ“', 'Committing offsets', { topicPartitionOffsets }) consumer.commit(topicPartitionOffsets) } } + export const disconnectConsumer = async (consumer: RdKafkaConsumer) => { await new Promise((resolve, reject) => { consumer.disconnect((error, data) => { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/utils.ts deleted file mode 100644 index 61d1e354fbc..00000000000 --- a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/utils.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { DateTime } from 'luxon' - -import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' - -export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => { - return { - window_id: message.window_id, - data: message.events, - } -} - -// Helper to return now as a milliseconds timestamp -export const now = () => DateTime.now().toMillis() diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/offset-high-water-marker.ts similarity index 72% rename from plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark.ts rename to plugin-server/src/main/ingestion-queues/session-recording/services/offset-high-water-marker.ts index d2a939248e1..db1bfaeea85 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/offset-high-water-marker.ts @@ -10,7 +10,7 @@ export const offsetHighWaterMarkKey = (prefix: string, tp: TopicPartition) => { return `${prefix}/${tp.topic}/${tp.partition}` } -export type SessionOffsetHighWaterMarks = Record +export type OffsetHighWaterMarks = Record /** * If a file is written to S3 we need to know the offset of the last message in that file so that we can @@ -22,10 +22,10 @@ export type SessionOffsetHighWaterMarks = Record * which offsets have been written to S3 for each session, and which haven't * so that we don't re-process those messages. */ -export class SessionOffsetHighWaterMark { +export class OffsetHighWaterMarker { // Watermarks are held in memory and synced back to redis on commit // We don't need to load them more than once per TP as this consumer is the only thing writing to it - private topicPartitionWaterMarks: Record | undefined> = {} + private topicPartitionWaterMarks: Record | undefined> = {} constructor(private redisPool: RedisPool, private keyPrefix = '@posthog/replay/partition-high-water-marks') {} @@ -40,7 +40,7 @@ export class SessionOffsetHighWaterMark { } } - public async getWaterMarks(tp: TopicPartition): Promise { + public async getWaterMarks(tp: TopicPartition): Promise { const key = offsetHighWaterMarkKey(this.keyPrefix, tp) // If we already have a watermark promise then we return it (i.e. we don't want to load the watermarks twice) @@ -50,17 +50,14 @@ export class SessionOffsetHighWaterMark { ).then((redisValue) => { // NOTE: We do this in a secondary promise to release the previous redis client - // redisValue is an array of [sessionId, offset, sessionId, offset, ...] - // we want to convert it to an object of { sessionId: offset, sessionId: offset, ... } - const highWaterMarks = redisValue.reduce( - (acc: SessionOffsetHighWaterMarks, value: string, index: number) => { - if (index % 2 === 0) { - acc[value] = parseInt(redisValue[index + 1]) - } - return acc - }, - {} - ) + // redisValue is an array of [key, offset, key, offset, ...] + // we want to convert it to an object of { key: offset, key: offset, ... } + const highWaterMarks = redisValue.reduce((acc: OffsetHighWaterMarks, value: string, index: number) => { + if (index % 2 === 0) { + acc[value] = parseInt(redisValue[index + 1]) + } + return acc + }, {}) this.topicPartitionWaterMarks[key] = Promise.resolve(highWaterMarks) @@ -71,32 +68,37 @@ export class SessionOffsetHighWaterMark { return this.topicPartitionWaterMarks[key]! } - public async add(tp: TopicPartition, sessionId: string, offset: number): Promise { + public async add(tp: TopicPartition, id: string, offset: number): Promise { const key = offsetHighWaterMarkKey(this.keyPrefix, tp) + const watermarks = await this.getWaterMarks(tp) + + if (offset <= (watermarks[id] ?? -1)) { + // SANITY CHECK: We don't want to add an offset that is less than or equal to the current offset + return + } + + // Immediately update the value so any subsequent calls to getWaterMarks will get the latest value + watermarks[id] = offset + this.topicPartitionWaterMarks[key] = Promise.resolve(watermarks) try { await this.run(`write offset high-water mark ${key} `, async (client) => { const returnCountOfUpdatedAndAddedElements = 'CH' - const updatedCount = await client.zadd(key, returnCountOfUpdatedAndAddedElements, offset, sessionId) + const updatedCount = await client.zadd(key, returnCountOfUpdatedAndAddedElements, offset, id) status.info('๐Ÿ“', 'WrittenOffsetCache added high-water mark for partition', { key, ...tp, - sessionId, + id, offset, updatedCount, }) - }).then(async () => { - // NOTE: We do this in a secondary promise to release the previous redis client - const watermarks = await this.getWaterMarks(tp) - watermarks[sessionId] = offset - this.topicPartitionWaterMarks[key] = Promise.resolve(watermarks) }) } catch (error) { status.error('๐Ÿงจ', 'WrittenOffsetCache failed to add high-water mark for partition', { error: error.message, key, ...tp, - sessionId, + id, offset, }) captureException(error, { @@ -106,16 +108,16 @@ export class SessionOffsetHighWaterMark { }, tags: { ...tp, - sessionId, + id, }, }) } } - public async onCommit(tp: TopicPartition, offset: number): Promise { + public async clear(tp: TopicPartition, offset: number): Promise { const key = offsetHighWaterMarkKey(this.keyPrefix, tp) try { - return await this.run(`commit all below offset high-water mark for ${key} `, async (client) => { + return await this.run(`clear all below offset high-water mark for ${key} `, async (client) => { const numberRemoved = await client.zremrangebyscore(key, '-Inf', offset) status.info('๐Ÿ“', 'WrittenOffsetCache committed all below high-water mark for partition', { numberRemoved, @@ -124,9 +126,9 @@ export class SessionOffsetHighWaterMark { }) const watermarks = await this.getWaterMarks(tp) // remove each key in currentHighWaterMarks that has an offset less than or equal to the offset we just committed - Object.entries(watermarks).forEach(([sessionId, value]) => { + Object.entries(watermarks).forEach(([id, value]) => { if (value && value <= offset) { - delete watermarks[sessionId] + delete watermarks[id] } }) @@ -155,10 +157,10 @@ export class SessionOffsetHighWaterMark { * it assumes that it has the latest high-water marks for this topic partition * so that callers are safe to drop messages */ - public async isBelowHighWaterMark(tp: TopicPartition, sessionId: string, offset: number): Promise { + public async isBelowHighWaterMark(tp: TopicPartition, id: string, offset: number): Promise { const highWaterMarks = await this.getWaterMarks(tp) - return offset <= (highWaterMarks[sessionId] ?? -1) + return offset <= (highWaterMarks[id] ?? -1) } public revoke(tp: TopicPartition) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/realtime-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts similarity index 98% rename from plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/realtime-manager.ts rename to plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts index 89d2ebbe792..9e9effce187 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/realtime-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts @@ -7,8 +7,8 @@ import { PluginsServerConfig, RedisPool } from '../../../../types' import { timeoutGuard } from '../../../../utils/db/utils' import { status } from '../../../../utils/status' import { createRedis } from '../../../../utils/utils' -import { IncomingRecordingMessage } from './types' -import { convertToPersistedMessage } from './utils' +import { IncomingRecordingMessage } from '../types' +import { convertToPersistedMessage } from '../utils' const Keys = { snapshots(teamId: number, suffix: string): string { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts new file mode 100644 index 00000000000..e115e6e6bf9 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -0,0 +1,187 @@ +import { captureException, captureMessage } from '@sentry/node' +import { randomUUID } from 'crypto' +import { DateTime } from 'luxon' +import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-rdkafka-acosom' + +import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics' +import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config' +import { findOffsetsToCommit } from '../../../../kafka/consumer' +import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling' +import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' +import { PluginsServerConfig } from '../../../../types' +import { status } from '../../../../utils/status' +import { createSessionReplayEvent } from '../../../../worker/ingestion/process-event' +import { eventDroppedCounter } from '../../metrics' +import { IncomingRecordingMessage } from '../types' +import { OffsetHighWaterMarker } from './offset-high-water-marker' + +const HIGH_WATERMARK_KEY = 'session_replay_events_ingester' + +export class ReplayEventsIngester { + producer?: RdKafkaProducer + + constructor( + private readonly serverConfig: PluginsServerConfig, + private readonly offsetHighWaterMarker: OffsetHighWaterMarker + ) {} + + public async consumeBatch(messages: IncomingRecordingMessage[]) { + const pendingProduceRequests: Promise[] = [] + + for (const message of messages) { + const results = await retryOnDependencyUnavailableError(() => this.consume(message)) + if (results) { + pendingProduceRequests.push(...results) + } + } + + // On each loop, we flush the producer to ensure that all messages + // are sent to Kafka. + try { + await flushProducer(this.producer!) + } catch (error) { + // Rather than handling errors from flush, we instead handle + // errors per produce request, which gives us a little more + // flexibility in terms of deciding if it is a terminal + // error or not. + } + + // We wait on all the produce requests to complete. After the + // flush they should all have been resolved/rejected already. If + // we get an intermittent error, such as a Kafka broker being + // unavailable, we will throw. We are relying on the Producer + // already having handled retries internally. + for (const produceRequest of pendingProduceRequests) { + try { + await produceRequest + } catch (error) { + status.error('๐Ÿ”', 'main_loop_error', { error }) + + if (error?.isRetriable) { + // We assume the if the error is retriable, then we + // are probably in a state where e.g. Kafka is down + // temporarily and we would rather simply throw and + // have the process restarted. + throw error + } + } + } + + const topicPartitionOffsets = findOffsetsToCommit(messages.map((message) => message.metadata)) + await Promise.all( + topicPartitionOffsets.map((tpo) => this.offsetHighWaterMarker.add(tpo, HIGH_WATERMARK_KEY, tpo.offset)) + ) + } + + public async consume(event: IncomingRecordingMessage): Promise[] | void> { + const warn = (text: string, labels: Record = {}) => + status.warn('โš ๏ธ', text, { + offset: event.metadata.offset, + partition: event.metadata.partition, + ...labels, + }) + + const drop = (reason: string, labels: Record = {}) => { + eventDroppedCounter + .labels({ + event_type: 'session_recordings_replay_events', + drop_cause: reason, + }) + .inc() + + warn(reason, { + reason, + ...labels, + }) + } + + if (!this.producer) { + return drop('producer_not_ready') + } + + if (event.replayIngestionConsumer !== 'v2') { + return drop('invalid_event_type') + } + + if ( + await this.offsetHighWaterMarker.isBelowHighWaterMark( + event.metadata, + HIGH_WATERMARK_KEY, + event.metadata.offset + ) + ) { + return drop('high_water_mark') + } + + try { + const replayRecord = createSessionReplayEvent( + randomUUID(), + event.team_id, + event.distinct_id, + event.session_id, + event.events + ) + + try { + // the replay record timestamp has to be valid and be within a reasonable diff from now + if (replayRecord !== null) { + const asDate = DateTime.fromSQL(replayRecord.first_timestamp) + if (!asDate.isValid || Math.abs(asDate.diffNow('months').months) >= 0.99) { + captureMessage(`Invalid replay record timestamp: ${replayRecord.first_timestamp} for event`, { + extra: { + replayRecord, + uuid: replayRecord.uuid, + timestamp: replayRecord.first_timestamp, + }, + tags: { + team: event.team_id, + session_id: replayRecord.session_id, + }, + }) + + return drop('invalid_timestamp') + } + } + } catch (e) { + captureException(e, { + extra: { + replayRecord, + }, + tags: { + team: event.team_id, + session_id: event.session_id, + }, + }) + + return drop('session_replay_summarizer_error') + } + + return [ + produce({ + producer: this.producer, + topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, + value: Buffer.from(JSON.stringify(replayRecord)), + key: event.session_id, + }), + ] + } catch (error) { + status.error('โš ๏ธ', 'processing_error', { + error: error, + }) + } + } + public async start(): Promise { + const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) + this.producer = await createKafkaProducer(connectionConfig) + this.producer.connect() + } + + public async stop(): Promise { + status.info('๐Ÿ”', 'ReplayEventsIngester - stopping') + + if (this.producer && this.producer.isConnected()) { + status.info('๐Ÿ”', 'ReplayEventsIngester disconnecting kafka producer in batchConsumer stop') + await disconnectProducer(this.producer) + } + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts similarity index 94% rename from plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts rename to plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts index 7071b309bc7..a98ef902361 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts @@ -12,10 +12,9 @@ import { PluginsServerConfig } from '../../../../types' import { asyncTimeoutGuard, timeoutGuard } from '../../../../utils/db/utils' import { status } from '../../../../utils/status' import { ObjectStorage } from '../../../services/object_storage' -import { bufferFileDir } from '../session-recordings-blob-consumer' +import { IncomingRecordingMessage } from '../types' +import { bufferFileDir, convertToPersistedMessage, maxDefined, minDefined, now } from '../utils' import { RealtimeManager } from './realtime-manager' -import { IncomingRecordingMessage } from './types' -import { convertToPersistedMessage, now } from './utils' const BUCKETS_LINES_WRITTEN = [0, 10, 50, 100, 500, 1000, 2000, 5000, 10000, Infinity] const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity] @@ -77,8 +76,8 @@ type SessionBuffer = { file: string fileStream: WriteStream offsets: { - lowest: number - highest: number + lowest?: number + highest?: number } eventsRange: { firstTimestamp: number @@ -350,7 +349,9 @@ export class SessionManager { // We want to delete the flush buffer before we proceed so that the onFinish handler doesn't reference it void this.destroyBuffer(this.flushBuffer) this.flushBuffer = undefined - this.onFinish([offsets.lowest, offsets.highest]) + if (offsets.lowest && offsets.highest) { + this.onFinish([offsets.lowest, offsets.highest]) + } } catch (error) { this.captureException(error) } finally { @@ -374,10 +375,7 @@ export class SessionManager { newestKafkaTimestamp: null, file, fileStream: createWriteStream(file, 'utf-8'), - offsets: { - lowest: Infinity, - highest: -Infinity, - }, + offsets: {}, eventsRange: null, } @@ -419,8 +417,8 @@ export class SessionManager { const content = JSON.stringify(messageData) + '\n' this.buffer.count += 1 this.buffer.sizeEstimate += content.length - this.buffer.offsets.lowest = Math.min(this.buffer.offsets.lowest, message.metadata.offset) - this.buffer.offsets.highest = Math.max(this.buffer.offsets.highest, message.metadata.offset) + this.buffer.offsets.lowest = minDefined(this.buffer.offsets.lowest, message.metadata.offset) + this.buffer.offsets.highest = maxDefined(this.buffer.offsets.highest, message.metadata.offset) if (this.realtime) { // We don't care about the response here as it is an optimistic call @@ -435,7 +433,7 @@ export class SessionManager { } private setEventsRangeFrom(message: IncomingRecordingMessage) { const start = message.events.at(0)?.timestamp - const end = message.events.at(-1)?.timestamp + const end = message.events.at(-1)?.timestamp ?? start if (!start || !end) { captureMessage( @@ -451,8 +449,8 @@ export class SessionManager { return } - const firstTimestamp = Math.min(start, this.buffer.eventsRange?.firstTimestamp || Infinity) - const lastTimestamp = Math.max(end || start, this.buffer.eventsRange?.lastTimestamp || -Infinity) + const firstTimestamp = minDefined(start, this.buffer.eventsRange?.firstTimestamp) ?? start + const lastTimestamp = maxDefined(end, this.buffer.eventsRange?.lastTimestamp) ?? end this.buffer.eventsRange = { firstTimestamp, lastTimestamp } } @@ -504,10 +502,7 @@ export class SessionManager { } public getLowestOffset(): number | null { - if (this.buffer.count === 0) { - return null - } - return Math.min(this.buffer.offsets.lowest, this.flushBuffer?.offsets.lowest ?? Infinity) + return minDefined(this.buffer.offsets.lowest, this.flushBuffer?.offsets.lowest) ?? null } private async destroyBuffer(buffer: SessionBuffer): Promise { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts similarity index 95% rename from plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts rename to plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts index 83bfaeaea1f..533015218e1 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts @@ -27,7 +27,7 @@ import { TeamManager } from '../../../worker/ingestion/team-manager' import { parseEventTimestamp } from '../../../worker/ingestion/timestamps' import { eventDroppedCounter } from '../metrics' -export const startSessionRecordingEventsConsumer = async ({ +export const startSessionRecordingEventsConsumerV1 = async ({ teamManager, kafkaConfig, consumerMaxBytes, @@ -273,19 +273,23 @@ const eachMessage = team.id, messagePayload.distinct_id, parseEventTimestamp(event as PluginEvent), - event.ip, event.properties || {} ) let replayRecord: null | SummarizedSessionRecordingEvent = null try { - replayRecord = createSessionReplayEvent( - messagePayload.uuid, - team.id, - messagePayload.distinct_id, - event.ip, - event.properties || {} - ) + const properties = event.properties || {} + const shouldCreateReplayEvents = (properties['$snapshot_consumer'] ?? 'v1') === 'v1' + + if (shouldCreateReplayEvents && properties.$snapshot_data?.events_summary.length) { + replayRecord = createSessionReplayEvent( + messagePayload.uuid, + team.id, + messagePayload.distinct_id, + properties['$session_id'], + properties.$snapshot_data?.events_summary || [] + ) + } // the replay record timestamp has to be valid and be within a reasonable diff from now if (replayRecord !== null) { const asDate = DateTime.fromSQL(replayRecord.first_timestamp) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts similarity index 85% rename from plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts rename to plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index 6aeb1e6ce00..72eadba38bf 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -1,22 +1,14 @@ import * as Sentry from '@sentry/node' import { captureException } from '@sentry/node' import { mkdirSync, rmSync } from 'node:fs' -import { - CODES, - features, - HighLevelProducer as RdKafkaProducer, - librdkafkaVersion, - Message, - TopicPartition, -} from 'node-rdkafka-acosom' -import path from 'path' +import { CODES, features, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka-acosom' import { Pool } from 'pg' import { Counter, Gauge, Histogram } from 'prom-client' +import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' -import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer' import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { status } from '../../../utils/status' @@ -24,11 +16,12 @@ import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-ma import { ObjectStorage } from '../../services/object_storage' import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' import { eventDroppedCounter } from '../metrics' -import { RealtimeManager } from './blob-ingester/realtime-manager' -import { SessionManager } from './blob-ingester/session-manager' -import { SessionOffsetHighWaterMark } from './blob-ingester/session-offset-high-water-mark' -import { IncomingRecordingMessage } from './blob-ingester/types' -import { now } from './blob-ingester/utils' +import { OffsetHighWaterMarker } from './services/offset-high-water-marker' +import { RealtimeManager } from './services/realtime-manager' +import { ReplayEventsIngester } from './services/replay-events-ingester' +import { SessionManager } from './services/session-manager' +import { IncomingRecordingMessage } from './types' +import { bufferFileDir, now } from './utils' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') @@ -37,8 +30,6 @@ const groupId = 'session-recordings-blob' const sessionTimeout = 30000 const flushIntervalTimeoutMs = 30000 -export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files') - const gaugeSessionsHandled = new Gauge({ name: 'recording_blob_ingestion_session_manager_count', help: 'A gauge of the number of sessions being handled by this blob ingestion consumer', @@ -84,12 +75,12 @@ const counterKafkaMessageReceived = new Counter({ labelNames: ['partition'], }) -export class SessionRecordingBlobIngester { +export class SessionRecordingIngesterV2 { sessions: Record = {} - sessionOffsetHighWaterMark?: SessionOffsetHighWaterMark + offsetHighWaterMarker: OffsetHighWaterMarker realtimeManager: RealtimeManager + replayEventsIngester: ReplayEventsIngester batchConsumer?: BatchConsumer - producer?: RdKafkaProducer flushInterval: NodeJS.Timer | null = null // the time at the most recent message of a particular partition partitionNow: Record = {} @@ -104,12 +95,12 @@ export class SessionRecordingBlobIngester { ) { this.realtimeManager = new RealtimeManager(this.redisPool, this.serverConfig) - if (this.serverConfig.SESSION_RECORDING_ENABLE_OFFSET_HIGH_WATER_MARK_PROCESSING) { - this.sessionOffsetHighWaterMark = new SessionOffsetHighWaterMark( - this.redisPool, - serverConfig.SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY - ) - } + this.offsetHighWaterMarker = new OffsetHighWaterMarker( + this.redisPool, + serverConfig.SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY + ) + + this.replayEventsIngester = new ReplayEventsIngester(this.serverConfig, this.offsetHighWaterMarker) this.teamsRefresher = new BackgroundRefresher(async () => { try { @@ -150,7 +141,7 @@ export class SessionRecordingBlobIngester { op: 'checkHighWaterMark', }) - if (await this.sessionOffsetHighWaterMark?.isBelowHighWaterMark({ topic, partition }, session_id, offset)) { + if (await this.offsetHighWaterMarker.isBelowHighWaterMark({ topic, partition }, session_id, offset)) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', @@ -180,7 +171,7 @@ export class SessionRecordingBlobIngester { this.commitOffsets(topic, partition, session_id, offsets) // We don't want to block if anything fails here. Watermarks are best effort - void this.sessionOffsetHighWaterMark?.add({ topic, partition }, session_id, offsets.slice(-1)[0]) + void this.offsetHighWaterMarker.add({ topic, partition }, session_id, offsets.slice(-1)[0]) } ) @@ -199,7 +190,7 @@ export class SessionRecordingBlobIngester { // If it is recoverable, we probably want to retry? } - public async handleKafkaMessage(message: Message, span?: Sentry.Span): Promise { + public async parseKafkaMessage(message: Message): Promise { const statusWarn = (reason: string, extra?: Record) => { status.warn('โš ๏ธ', 'invalid_message', { reason, @@ -236,13 +227,9 @@ export class SessionRecordingBlobIngester { let teamId: TeamId | null = null const token = messagePayload.token - const teamSpan = span?.startChild({ - op: 'fetchTeam', - }) if (token) { teamId = await this.teamsRefresher.get().then((teams) => teams[token] || null) } - teamSpan?.finish() if (teamId == null) { eventDroppedCounter @@ -267,17 +254,14 @@ export class SessionRecordingBlobIngester { }, team_id: teamId, - distinct_id: event.distinct_id, + distinct_id: event.properties.distinct_id, session_id: event.properties?.$session_id, window_id: event.properties?.$window_id, events: event.properties.$snapshot_items, + replayIngestionConsumer: event.properties?.$snapshot_consumer ?? 'v1', } - const consumeSpan = span?.startChild({ - op: 'consume', - }) - await this.consume(recordingMessage, consumeSpan) - consumeSpan?.finish() + return recordingMessage } private async handleEachBatch(messages: Message[]): Promise { @@ -285,15 +269,20 @@ export class SessionRecordingBlobIngester { histogramKafkaBatchSize.observe(messages.length) - await Promise.all( - messages.map(async (message) => { - const childSpan = transaction.startChild({ - op: 'handleKafkaMessage', - }) - await this.handleKafkaMessage(message, childSpan) - childSpan.finish() + const recordingMessages: IncomingRecordingMessage[] = ( + await Promise.all(messages.map((m) => this.parseKafkaMessage(m))) + ).filter((message) => message) as IncomingRecordingMessage[] + + for (const message of recordingMessages) { + const consumeSpan = transaction?.startChild({ + op: 'blobConsume', }) - ) + + await this.consume(message, consumeSpan) + consumeSpan?.finish() + } + + await this.replayEventsIngester.consumeBatch(recordingMessages) transaction.finish() } @@ -317,11 +306,14 @@ export class SessionRecordingBlobIngester { // Load teams into memory await this.teamsRefresher.refresh() + await this.replayEventsIngester.start() + const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) - this.producer = await createKafkaProducer(connectionConfig) // Create a node-rdkafka consumer that fetches batches of messages, runs // eachBatchWithContext, then commits offsets for the batch. + + const recordingConsumerConfig = sessionRecordingConsumerConfig(this.serverConfig) this.batchConsumer = await startBatchConsumer({ connectionConfig, groupId, @@ -330,14 +322,14 @@ export class SessionRecordingBlobIngester { // the largest size of a message that can be fetched by the consumer. // the largest size our MSK cluster allows is 20MB // we only use 9 or 10MB but there's no reason to limit this ๐Ÿคท๏ธ - consumerMaxBytes: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES, - consumerMaxBytesPerPartition: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, + consumerMaxBytes: recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES, + consumerMaxBytesPerPartition: recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, // our messages are very big, so we don't want to buffer too many - queuedMinMessages: this.serverConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE, - consumerMaxWaitMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS, - consumerErrorBackoffMs: this.serverConfig.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, - fetchBatchSize: this.serverConfig.SESSION_RECORDING_KAFKA_BATCH_SIZE, - batchingTimeoutMs: this.serverConfig.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, + queuedMinMessages: recordingConsumerConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE, + consumerMaxWaitMs: recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS, + consumerErrorBackoffMs: recordingConsumerConfig.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, + fetchBatchSize: recordingConsumerConfig.SESSION_RECORDING_KAFKA_BATCH_SIZE, + batchingTimeoutMs: recordingConsumerConfig.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, autoCommit: false, eachBatch: async (messages) => { return await this.handleEachBatch(messages) @@ -388,7 +380,7 @@ export class SessionRecordingBlobIngester { gaugeLagMilliseconds.remove({ partition }) gaugeOffsetCommitted.remove({ partition }) gaugeOffsetCommitFailed.remove({ partition }) - this.sessionOffsetHighWaterMark?.revoke(topicPartition) + this.offsetHighWaterMarker.revoke(topicPartition) this.partitionNow[partition] = null this.partitionLastKnownCommit[partition] = null }) @@ -402,14 +394,8 @@ export class SessionRecordingBlobIngester { }) // Make sure to disconnect the producer after we've finished consuming. - this.batchConsumer.join().finally(async () => { - if (this.producer && this.producer.isConnected()) { - status.debug( - '๐Ÿ”', - 'blob_ingester_consumer disconnecting kafka producer in session recordings batchConsumer finally' - ) - await disconnectProducer(this.producer) - } + this.batchConsumer.join().finally(() => { + status.debug('๐Ÿ”', 'blob_ingester_consumer - batch consumer has finished') }) this.batchConsumer.consumer.on('disconnected', async (err) => { @@ -469,11 +455,7 @@ export class SessionRecordingBlobIngester { } await this.realtimeManager.unsubscribe() - - if (this.producer && this.producer.isConnected()) { - status.info('๐Ÿ”', 'blob_ingester_consumer disconnecting kafka producer in batchConsumer stop') - await disconnectProducer(this.producer) - } + await this.replayEventsIngester.stop() await this.batchConsumer?.stop() // This is inefficient but currently necessary due to new instances restarting from the committed offset point @@ -540,7 +522,7 @@ export class SessionRecordingBlobIngester { offsetToCommit: highestOffsetToCommit, }) - void this.sessionOffsetHighWaterMark?.onCommit({ topic, partition }, highestOffsetToCommit) + void this.offsetHighWaterMarker.clear({ topic, partition }, highestOffsetToCommit) try { this.batchConsumer?.consumer.commit({ diff --git a/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts b/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts index f99e37b5358..f05a58cd53a 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts @@ -6,38 +6,32 @@ * Any changes may need to be sync'd between the two */ +import { RRWebEvent } from '../../../types' + const activeSources = [1, 2, 3, 4, 5, 6, 7, 12] const ACTIVITY_THRESHOLD_MS = 5000 -export interface RRWebEventSummaryData { +export interface RRWebPartialData { href?: string source?: number payload?: Record plugin?: string } -export interface RRWebEventSummary { - timestamp: number - type: number - data: RRWebEventSummaryData - windowId: string -} - interface RecordingSegment { kind: 'window' | 'buffer' | 'gap' startTimestamp: number // Epoch time that the segment starts endTimestamp: number // Epoch time that the segment ends durationMs: number - windowId?: string isActive: boolean } -const isActiveEvent = (event: RRWebEventSummary): boolean => { +const isActiveEvent = (event: RRWebEvent): boolean => { return event.type === 3 && activeSources.includes(event.data?.source || -1) } -const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => { +const createSegments = (snapshots: RRWebEvent[]): RecordingSegment[] => { let segments: RecordingSegment[] = [] let activeSegment!: Partial let lastActiveEventTimestamp = 0 @@ -60,11 +54,6 @@ const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => { isNewSegment = true } - // 4. If windowId changes we create a new segment - if (activeSegment?.windowId !== snapshot.windowId) { - isNewSegment = true - } - if (isNewSegment) { if (activeSegment) { segments.push(activeSegment as RecordingSegment) @@ -73,7 +62,6 @@ const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => { activeSegment = { kind: 'window', startTimestamp: snapshot.timestamp, - windowId: snapshot.windowId, isActive: eventIsActive, } } @@ -98,7 +86,7 @@ const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => { * TODO add code sharing between plugin-server and front-end so that this method can * call the same createSegments function as the front-end */ -export const activeMilliseconds = (snapshots: RRWebEventSummary[]): number => { +export const activeMilliseconds = (snapshots: RRWebEvent[]): number => { const segments = createSegments(snapshots) return segments.reduce((acc, segment) => { if (segment.isActive) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/types.ts b/plugin-server/src/main/ingestion-queues/session-recording/types.ts similarity index 58% rename from plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/types.ts rename to plugin-server/src/main/ingestion-queues/session-recording/types.ts index 3cf8e6dbe03..c29c1ad81f1 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/types.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/types.ts @@ -1,16 +1,11 @@ // This is the incoming message from Kafka -export type RRWebEvent = Record & { - timestamp: number - type: number - data: any -} +import { TopicPartitionOffset } from 'node-rdkafka-acosom' + +import { RRWebEvent } from '../../../types' export type IncomingRecordingMessage = { - metadata: { - topic: string - partition: number - offset: number + metadata: TopicPartitionOffset & { timestamp: number } @@ -19,6 +14,8 @@ export type IncomingRecordingMessage = { session_id: string window_id?: string events: RRWebEvent[] + // NOTE: This is only for migrating from one consumer to the other + replayIngestionConsumer: 'v1' | 'v2' } // This is the incoming message from Kafka diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts new file mode 100644 index 00000000000..5ef9dace47c --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -0,0 +1,26 @@ +import { DateTime } from 'luxon' +import path from 'path' + +import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' + +export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => { + return { + window_id: message.window_id, + data: message.events, + } +} + +// Helper to return now as a milliseconds timestamp +export const now = () => DateTime.now().toMillis() + +export const minDefined = (...args: (number | undefined)[]): number | undefined => { + const definedArgs = args.filter((arg) => arg !== undefined) as number[] + return Math.min(...definedArgs) +} + +export const maxDefined = (...args: (number | undefined)[]): number | undefined => { + const definedArgs = args.filter((arg) => arg !== undefined) as number[] + return Math.max(...definedArgs) +} + +export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files') diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 380ada9df22..5aca2606b92 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -7,7 +7,7 @@ import * as schedule from 'node-schedule' import { Counter } from 'prom-client' import { getPluginServerCapabilities } from '../capabilities' -import { defaultConfig, sessionRecordingBlobConsumerConfig } from '../config/config' +import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createStatsdClient } from '../utils/db/hub' import { captureEventLoopMetrics } from '../utils/metrics' @@ -31,8 +31,8 @@ import { startAsyncWebhooksHandlerConsumer, } from './ingestion-queues/on-event-handler-consumer' import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' -import { SessionRecordingBlobIngester } from './ingestion-queues/session-recording/session-recordings-blob-consumer' -import { startSessionRecordingEventsConsumer } from './ingestion-queues/session-recording/session-recordings-consumer' +import { startSessionRecordingEventsConsumerV1 } from './ingestion-queues/session-recording/session-recordings-consumer-v1' +import { SessionRecordingIngesterV2 } from './ingestion-queues/session-recording/session-recordings-consumer-v2' import { createHttpServer } from './services/http-server' import { getObjectStorage } from './services/object_storage' @@ -393,7 +393,7 @@ export async function startPluginsServer( stop, isHealthy: isSessionRecordingsHealthy, join, - } = await startSessionRecordingEventsConsumer({ + } = await startSessionRecordingEventsConsumerV1({ teamManager: teamManager, kafkaConfig: serverConfig, consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES, @@ -408,17 +408,20 @@ export async function startPluginsServer( } if (capabilities.sessionRecordingBlobIngestion) { - const blobServerConfig = sessionRecordingBlobConsumerConfig(serverConfig) - const postgres = hub?.postgres ?? createPostgresPool(blobServerConfig.DATABASE_URL) - const s3 = hub?.objectStorage ?? getObjectStorage(blobServerConfig) - const redisPool = hub?.db.redisPool ?? createRedisPool(blobServerConfig) + const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) + const postgres = hub?.postgres ?? createPostgresPool(recordingConsumerConfig.DATABASE_URL) + const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) + const redisPool = hub?.db.redisPool ?? createRedisPool(recordingConsumerConfig) if (!s3) { throw new Error("Can't start session recording blob ingestion without object storage") } - const ingester = new SessionRecordingBlobIngester(blobServerConfig, postgres, s3, redisPool) + // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas + const ingester = new SessionRecordingIngesterV2(serverConfig, postgres, s3, redisPool) await ingester.start() + const batchConsumer = ingester.batchConsumer + if (batchConsumer) { stopSessionRecordingBlobConsumer = async () => { // Tricky - in some cases the hub is responsible, in which case it will drain and clear. Otherwise we are responsible. diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 866881dba51..bbaca7a114c 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -197,7 +197,6 @@ export interface PluginsServerConfig { EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number CLOUD_DEPLOYMENT: string - SESSION_RECORDING_ENABLE_OFFSET_HIGH_WATER_MARK_PROCESSING: boolean // local directory might be a volume mount or a directory on disk (e.g. in local dev) SESSION_RECORDING_LOCAL_DIRECTORY: string SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number @@ -1130,3 +1129,9 @@ export interface PipelineEvent extends Omit { } export type RedisPool = GenericPool + +export type RRWebEvent = Record & { + timestamp: number + type: number + data: any +} diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 68269d88f3f..a236e8e49a1 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -3,7 +3,7 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { DateTime } from 'luxon' -import { activeMilliseconds, RRWebEventSummary } from '../../main/ingestion-queues/session-recording/snapshot-segmenter' +import { activeMilliseconds } from '../../main/ingestion-queues/session-recording/snapshot-segmenter' import { Element, GroupTypeIndex, @@ -15,6 +15,7 @@ import { RawClickHouseEvent, RawPerformanceEvent, RawSessionRecordingEvent, + RRWebEvent, Team, TimestampFormat, } from '../../types' @@ -241,7 +242,6 @@ export const createSessionRecordingEvent = ( team_id: number, distinct_id: string, timestamp: DateTime, - ip: string | null, properties: Properties ) => { const timestampString = castTimestampOrNow(timestamp, TimestampFormat.ClickHouse) @@ -282,33 +282,19 @@ export const createSessionReplayEvent = ( uuid: string, team_id: number, distinct_id: string, - ip: string | null, - properties: Properties + session_id: string, + events: RRWebEvent[] ) => { - const chunkIndex = properties['$snapshot_data']?.chunk_index - - // only the first chunk has the eventsSummary - // we can ignore subsequent chunks for calculating a replay event - if (chunkIndex > 0) { - return null - } - - const eventsSummaries: RRWebEventSummary[] = properties['$snapshot_data']?.['events_summary'] || [] - - const timestamps = eventsSummaries - .filter((eventSummary: RRWebEventSummary) => { - return !!eventSummary?.timestamp - }) - .map((eventSummary: RRWebEventSummary) => { - return castTimestampOrNow(DateTime.fromMillis(eventSummary.timestamp), TimestampFormat.ClickHouse) - }) + const timestamps = events + .filter((e) => !!e?.timestamp) + .map((e) => castTimestampOrNow(DateTime.fromMillis(e.timestamp), TimestampFormat.ClickHouse)) .sort() // but every event where chunk index = 0 must have an eventsSummary - if (eventsSummaries.length === 0 || timestamps.length === 0) { + if (events.length === 0 || timestamps.length === 0) { status.warn('๐Ÿ™ˆ', 'ignoring an empty session recording event', { - session_id: properties['$session_id'], - properties: properties, + session_id, + events, }) // it is safe to throw here as it caught a level up so that we can see this happening in Sentry throw new Error('ignoring an empty session recording event') @@ -321,21 +307,21 @@ export const createSessionReplayEvent = ( let consoleWarnCount = 0 let consoleErrorCount = 0 let url: string | undefined = undefined - eventsSummaries.forEach((eventSummary: RRWebEventSummary) => { - if (eventSummary.type === 3) { + events.forEach((event) => { + if (event.type === 3) { mouseActivity += 1 - if (eventSummary.data?.source === 2) { + if (event.data?.source === 2) { clickCount += 1 } - if (eventSummary.data?.source === 5) { + if (event.data?.source === 5) { keypressCount += 1 } } - if (!!eventSummary.data?.href?.trim().length && url === undefined) { - url = eventSummary.data.href + if (!!event.data?.href?.trim().length && url === undefined) { + url = event.data.href } - if (eventSummary.type === 6 && eventSummary.data?.plugin === 'rrweb/console@1') { - const level = eventSummary.data.payload?.level + if (event.type === 6 && event.data?.plugin === 'rrweb/console@1') { + const level = event.data.payload?.level if (level === 'log') { consoleLogCount += 1 } else if (level === 'warn') { @@ -346,13 +332,13 @@ export const createSessionReplayEvent = ( } }) - const activeTime = activeMilliseconds(eventsSummaries) + const activeTime = activeMilliseconds(events) const data: SummarizedSessionRecordingEvent = { uuid, team_id: team_id, distinct_id: distinct_id, - session_id: properties['$session_id'], + session_id: session_id, first_timestamp: timestamps[0], last_timestamp: timestamps[timestamps.length - 1], click_count: clickCount, @@ -363,7 +349,7 @@ export const createSessionReplayEvent = ( console_log_count: consoleLogCount, console_warn_count: consoleWarnCount, console_error_count: consoleErrorCount, - size: Buffer.byteLength(JSON.stringify(properties), 'utf8'), + size: Buffer.byteLength(JSON.stringify(events), 'utf8'), } return data diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts index 992c0b24a41..7bdb5d749e7 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -1,4 +1,4 @@ -import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/types' +import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types' import jsonFullSnapshot from './data/snapshot-full.json' export function createIncomingRecordingMessage( @@ -16,6 +16,7 @@ export function createIncomingRecordingMessage( session_id: 'session_id_1', window_id: 'window_id_1', events: [{ ...jsonFullSnapshot }], + replayIngestionConsumer: 'v2', ...partialIncomingMessage, metadata: { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/offset-high-water-mark.test.ts similarity index 73% rename from plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark.test.ts rename to plugin-server/tests/main/ingestion-queues/session-recording/services/offset-high-water-mark.test.ts index 287884f47ce..6f5493de3ac 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/offset-high-water-mark.test.ts @@ -1,10 +1,10 @@ import { TopicPartition } from 'kafkajs' import { + OffsetHighWaterMarker, offsetHighWaterMarkKey, - SessionOffsetHighWaterMark, - SessionOffsetHighWaterMarks, -} from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark' + OffsetHighWaterMarks, +} from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker' import { Hub } from '../../../../../src/types' import { createHub } from '../../../../../src/utils/db/hub' @@ -13,7 +13,7 @@ describe('session offset high-water mark', () => { let hub: Hub let closeHub: () => Promise const keyPrefix = 'test-high-water-mark' - let sessionOffsetHighWaterMark: SessionOffsetHighWaterMark + let offsetHighWaterMarker: OffsetHighWaterMarker async function deletePrefixedKeys() { const redisClient = await hub.redisPool.acquire() @@ -32,7 +32,7 @@ describe('session offset high-water mark', () => { const redisValue = await client.zrange(key, 0, -1, 'WITHSCORES') await hub.redisPool.release(client) - return redisValue.reduce((acc: SessionOffsetHighWaterMarks, value: string, index: number) => { + return redisValue.reduce((acc: OffsetHighWaterMarks, value: string, index: number) => { if (index % 2 === 0) { acc[value] = parseInt(redisValue[index + 1]) } @@ -42,7 +42,7 @@ describe('session offset high-water mark', () => { beforeEach(async () => { ;[hub, closeHub] = await createHub() - sessionOffsetHighWaterMark = new SessionOffsetHighWaterMark(hub.redisPool, keyPrefix) + offsetHighWaterMarker = new OffsetHighWaterMarker(hub.redisPool, keyPrefix) }) afterEach(async () => { @@ -51,18 +51,18 @@ describe('session offset high-water mark', () => { }) const expectMemoryAndRedisToEqual = async (tp: TopicPartition, toEqual: any) => { - expect(await sessionOffsetHighWaterMark.getWaterMarks(tp)).toEqual(toEqual) + expect(await offsetHighWaterMarker.getWaterMarks(tp)).toEqual(toEqual) expect(await getWaterMarksFromRedis(tp)).toEqual(toEqual) } describe('with no existing high-water marks', () => { it('can remove all high-water marks based on a given offset', async () => { - await sessionOffsetHighWaterMark.onCommit({ topic: 'topic', partition: 1 }, 12) + await offsetHighWaterMarker.clear({ topic: 'topic', partition: 1 }, 12) await expectMemoryAndRedisToEqual({ topic: 'topic', partition: 1 }, {}) }) it('can add a high-water mark', async () => { - await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session', 123) + await offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session', 123) await expectMemoryAndRedisToEqual( { topic: 'topic', partition: 1 }, { @@ -73,8 +73,8 @@ describe('session offset high-water mark', () => { it('can get multiple watermarks without clashes', async () => { const results = await Promise.all([ - sessionOffsetHighWaterMark.getWaterMarks({ topic: 'topic', partition: 1 }), - sessionOffsetHighWaterMark.getWaterMarks({ topic: 'topic', partition: 2 }), + offsetHighWaterMarker.getWaterMarks({ topic: 'topic', partition: 1 }), + offsetHighWaterMarker.getWaterMarks({ topic: 'topic', partition: 2 }), ]) expect(results).toEqual([{}, {}]) @@ -82,9 +82,9 @@ describe('session offset high-water mark', () => { it('can add multiple high-water marks in parallel', async () => { await Promise.all([ - sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session', 10), - sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session2', 20), - sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 2 }, 'some-session3', 30), + offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session', 10), + offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session2', 20), + offsetHighWaterMarker.add({ topic: 'topic', partition: 2 }, 'some-session3', 30), ]) await expectMemoryAndRedisToEqual( @@ -107,11 +107,11 @@ describe('session offset high-water mark', () => { describe('with existing high-water marks', () => { beforeEach(async () => { // works even before anything is written to redis - expect(await sessionOffsetHighWaterMark.getWaterMarks({ topic: 'topic', partition: 1 })).toStrictEqual({}) + expect(await offsetHighWaterMarker.getWaterMarks({ topic: 'topic', partition: 1 })).toStrictEqual({}) - await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session', 123) - await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'another-session', 12) - await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 2 }, 'a-third-session', 1) + await offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session', 123) + await offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'another-session', 12) + await offsetHighWaterMarker.add({ topic: 'topic', partition: 2 }, 'a-third-session', 1) }) it('can get high-water marks for all sessions for a partition', async () => { @@ -125,7 +125,7 @@ describe('session offset high-water mark', () => { }) it('can remove all high-water marks based on a given offset', async () => { - await sessionOffsetHighWaterMark.onCommit({ topic: 'topic', partition: 1 }, 12) + await offsetHighWaterMarker.clear({ topic: 'topic', partition: 1 }, 12) // the commit updates redis // removes all high-water marks that are <= 12 @@ -156,7 +156,7 @@ describe('session offset high-water mark', () => { await Promise.allSettled( partitionOneTestCases.map(async ([offset, expected]) => { expect( - await sessionOffsetHighWaterMark.isBelowHighWaterMark( + await offsetHighWaterMarker.isBelowHighWaterMark( { topic: 'topic', partition: 1 }, 'some-session', offset @@ -169,7 +169,7 @@ describe('session offset high-water mark', () => { it('can check if an offset is below the high-water mark even if we have never seen it before', async () => { // there is nothing for a partition? we are always below the high-water mark expect( - await sessionOffsetHighWaterMark.isBelowHighWaterMark( + await offsetHighWaterMarker.isBelowHighWaterMark( { topic: 'topic', partition: 1 }, 'anything we did not add yet', 5432 diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager.test.ts similarity index 99% rename from plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-manager.test.ts rename to plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager.test.ts index c97df9c8090..ad6195c9018 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-manager.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager.test.ts @@ -3,8 +3,8 @@ import { createReadStream, createWriteStream } from 'fs' import { DateTime, Settings } from 'luxon' import { defaultConfig } from '../../../../../src/config/config' -import { SessionManager } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/session-manager' -import { now } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils' +import { SessionManager } from '../../../../../src/main/ingestion-queues/session-recording/services/session-manager' +import { now } from '../../../../../src/main/ingestion-queues/session-recording/utils' import { createIncomingRecordingMessage } from '../fixtures' const createMockStream = () => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v1.test.ts similarity index 85% rename from plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts rename to plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v1.test.ts index dd6fc25e9fc..f3506d5cd03 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v1.test.ts @@ -3,8 +3,8 @@ import LibrdKafkaError from 'node-rdkafka-acosom/lib/error' import { Pool } from 'pg' import { defaultConfig } from '../../../../src/config/config' -import { now } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils' -import { eachBatch } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer' +import { eachBatch } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v1' +import { now } from '../../../../src/main/ingestion-queues/session-recording/utils' import { TeamManager } from '../../../../src/worker/ingestion/team-manager' import { createOrganization, createTeam } from '../../../helpers/sql' @@ -104,6 +104,32 @@ describe('session-recordings-consumer', () => { expect(producer.produce).toHaveBeenCalledTimes(2) }) + test('eachBatch does not emit replay event if set to other consumer', async () => { + const organizationId = await createOrganization(postgres) + const teamId = await createTeam(postgres, organizationId) + + const eachBachWithDependencies: any = eachBatch({ producer, teamManager }) + + await eachBachWithDependencies([ + { + key: 'test', + value: JSON.stringify({ + team_id: teamId, + data: JSON.stringify({ + event: '$snapshot', + properties: { + $snapshot_data: { events_summary: [{ timestamp: now() }] }, + $snapshot_consumer: 'v2', + }, + }), + }), + timestamp: 123, + }, + ]) + + expect(producer.produce).toHaveBeenCalledTimes(1) + }) + test('eachBatch does not emit a replay record that is more than a month in the future', async () => { const organizationId = await createOrganization(postgres) const teamId = await createTeam(postgres, organizationId) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2-rebalancing.test.ts similarity index 81% rename from plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts rename to plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2-rebalancing.test.ts index 3d37d519ac0..c172294125a 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2-rebalancing.test.ts @@ -3,13 +3,13 @@ import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' import { defaultConfig } from '../../../../src/config/config' -import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer' +import { SessionRecordingIngesterV2 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v2' import { Hub, PluginsServerConfig } from '../../../../src/types' import { createHub } from '../../../../src/utils/db/hub' import { UUIDT } from '../../../../src/utils/utils' import { createIncomingRecordingMessage } from './fixtures' -function assertIngesterHasExpectedPartitions(ingester: SessionRecordingBlobIngester, expectedPartitions: number[]) { +function assertIngesterHasExpectedPartitions(ingester: SessionRecordingIngesterV2, expectedPartitions: number[]) { const partitions: Set = new Set() Object.values(ingester.sessions).forEach((session) => { partitions.add(session.partition) @@ -23,8 +23,8 @@ describe('ingester rebalancing tests', () => { SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/test-session-recordings', } - let ingesterOne: SessionRecordingBlobIngester - let ingesterTwo: SessionRecordingBlobIngester + let ingesterOne: SessionRecordingIngesterV2 + let ingesterTwo: SessionRecordingIngesterV2 let hub: Hub let closeHub: () => Promise @@ -48,7 +48,7 @@ describe('ingester rebalancing tests', () => { }) it('rebalances partitions safely from one to two consumers', async () => { - ingesterOne = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool) + ingesterOne = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool) await ingesterOne.start() @@ -59,7 +59,7 @@ describe('ingester rebalancing tests', () => { assertIngesterHasExpectedPartitions(ingesterOne, [1]) }) - ingesterTwo = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool) + ingesterTwo = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool) await ingesterTwo.start() await waitForExpect(() => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts similarity index 97% rename from plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts rename to plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts index 7d3a6f337fe..58ee5c4ccfd 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts @@ -3,7 +3,7 @@ import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' import { defaultConfig } from '../../../../src/config/config' -import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer' +import { SessionRecordingIngesterV2 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v2' import { Hub, PluginsServerConfig } from '../../../../src/types' import { createHub } from '../../../../src/utils/db/hub' import { createIncomingRecordingMessage } from './fixtures' @@ -50,7 +50,7 @@ describe('ingester', () => { SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/test-session-recordings', } - let ingester: SessionRecordingBlobIngester + let ingester: SessionRecordingIngesterV2 let hub: Hub let closeHub: () => Promise @@ -84,7 +84,7 @@ describe('ingester', () => { // these tests assume that a flush won't run while they run beforeEach(async () => { - ingester = new SessionRecordingBlobIngester( + ingester = new SessionRecordingIngesterV2( { ...defaultConfig, SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY: keyPrefix, diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index c4da4e278d5..a2b7d3f426c 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -11,7 +11,6 @@ import * as IORedis from 'ioredis' import { DateTime } from 'luxon' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/config/kafka-topics' -import { RRWebEventSummary } from '../../src/main/ingestion-queues/session-recording/snapshot-segmenter' import { ClickHouseEvent, Database, @@ -20,6 +19,7 @@ import { Person, PluginsServerConfig, PropertyDefinitionTypeEnum, + RRWebEvent, Team, } from '../../src/types' import { createHub } from '../../src/utils/db/hub' @@ -1191,14 +1191,10 @@ test('capture first team event', async () => { }) test('snapshot event stored as session_recording_event', () => { - const data = createSessionRecordingEvent( - 'some-id', - team.id, - '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', - now, - '', - { $session_id: 'abcf-efg', $snapshot_data: { timestamp: 123 } } as any as Properties - ) + const data = createSessionRecordingEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', now, { + $session_id: 'abcf-efg', + $snapshot_data: { timestamp: 123 }, + } as any as Properties) expect(data).toEqual({ created_at: expect.stringMatching(/^\d{4}-\d{2}-\d{2} [\d\s:]+/), @@ -1212,7 +1208,7 @@ test('snapshot event stored as session_recording_event', () => { }) }) const sessionReplayEventTestCases: { - snapshotData: { events_summary: RRWebEventSummary[] } + snapshotData: { events_summary: RRWebEvent[] } expected: Pick< SummarizedSessionRecordingEvent, | 'click_count' @@ -1241,7 +1237,7 @@ const sessionReplayEventTestCases: { console_log_count: 0, console_warn_count: 0, console_error_count: 0, - size: 136, + size: 73, }, }, { @@ -1257,7 +1253,7 @@ const sessionReplayEventTestCases: { console_log_count: 0, console_warn_count: 0, console_error_count: 0, - size: 136, + size: 73, }, }, { @@ -1313,7 +1309,7 @@ const sessionReplayEventTestCases: { console_log_count: 2, console_warn_count: 3, console_error_count: 1, - size: 825, + size: 762, }, }, { @@ -1351,7 +1347,7 @@ const sessionReplayEventTestCases: { console_log_count: 0, console_warn_count: 0, console_error_count: 0, - size: 276, + size: 213, }, }, { @@ -1378,16 +1374,19 @@ const sessionReplayEventTestCases: { console_log_count: 0, console_warn_count: 0, console_error_count: 0, - size: 496, + size: 433, }, }, ] sessionReplayEventTestCases.forEach(({ snapshotData, expected }) => { test(`snapshot event ${JSON.stringify(snapshotData)} can be stored as session_replay_event`, () => { - const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', { - $session_id: 'abcf-efg', - $snapshot_data: snapshotData, - } as any as Properties) + const data = createSessionReplayEvent( + 'some-id', + team.id, + '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', + 'abcf-efg', + snapshotData.events_summary + ) const expectedEvent: SummarizedSessionRecordingEvent = { distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', @@ -1402,37 +1401,29 @@ sessionReplayEventTestCases.forEach(({ snapshotData, expected }) => { test(`snapshot event with no event summary is ignored`, () => { expect(() => { - createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', { - $session_id: 'abcf-efg', - $snapshot_data: {}, - } as any as Properties) + createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', 'abcf-efg', []) }).toThrowError() }) test(`snapshot event with no event summary timestamps is ignored`, () => { expect(() => { - createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', { - $session_id: 'abcf-efg', - $snapshot_data: { - events_summary: [ - { - type: 5, - data: { - payload: { - // doesn't match because href is nested in payload - href: 'http://127.0.0.1:8000/home', - }, - }, + createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', 'abcf-efg', [ + { + type: 5, + data: { + payload: { + // doesn't match because href is nested in payload + href: 'http://127.0.0.1:8000/home', }, - { - type: 4, - data: { - href: 'http://127.0.0.1:8000/second/url', - }, - }, - ], + }, }, - } as any as Properties) + { + type: 4, + data: { + href: 'http://127.0.0.1:8000/second/url', + }, + }, + ] as any[]) }).toThrowError() }) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index a0fb329fee9..ff669fa97fc 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -164,6 +164,7 @@ def log_event(data: Dict, event_name: str, partition_key: Optional[str]): producer = sessionRecordingKafkaProducer() else: producer = KafkaProducer() + future = producer.produce(topic=kafka_topic, data=data, key=partition_key) statsd.incr("posthog_cloud_plugin_server_ingestion") return future @@ -365,6 +366,8 @@ def get_event(request): # NOTE: Whilst we are testing this code we want to track exceptions but allow the events through if anything goes wrong capture_exception(e) + consumer_destination = "v2" if random() <= settings.REPLAY_EVENTS_NEW_CONSUMER_RATIO else "v1" + try: replay_events, other_events = split_replay_events(events) processed_replay_events = replay_events @@ -373,6 +376,10 @@ def get_event(request): # Legacy solution stays in place processed_replay_events = legacy_preprocess_session_recording_events_for_clickhouse(replay_events) + # Mark all events so that they are only consumed by one consumer + for event in processed_replay_events: + event["properties"]["$snapshot_consumer"] = consumer_destination + events = processed_replay_events + other_events except ValueError as e: @@ -445,12 +452,16 @@ def get_event(request): ) try: - if replay_events and random() <= settings.REPLAY_BLOB_INGESTION_TRAFFIC_RATIO: + if replay_events: # The new flow we only enable if the dedicated kafka is enabled alternative_replay_events = preprocess_replay_events_for_blob_ingestion( replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES ) + # Mark all events so that they are only consumed by one consumer + for event in alternative_replay_events: + event["properties"]["$snapshot_consumer"] = consumer_destination + futures = [] # We want to be super careful with our new ingestion flow for now so the whole thing is separated diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index 4770b40106a..32569a937c9 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -1,6 +1,6 @@ # name: TestCohort.test_async_deletion_of_cohort ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -10,7 +10,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.1 ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -114,7 +114,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.2 ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -124,7 +124,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.3 ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 @@ -134,7 +134,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.4 ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -144,7 +144,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.5 ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -178,7 +178,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.6 ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -188,7 +188,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.7 ' - /* user_id:103 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:102 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 463225e929a..168fe1e1e4f 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -1186,7 +1186,7 @@ class TestCapture(BaseTest): def test_legacy_recording_ingestion_data_sent_to_kafka(self, kafka_produce) -> None: session_id = "some_session_id" self._send_session_recording_event(session_id=session_id) - self.assertEqual(kafka_produce.call_count, 1) + self.assertEqual(kafka_produce.call_count, 2) kafka_topic_used = kafka_produce.call_args_list[0][1]["topic"] self.assertEqual(kafka_topic_used, KAFKA_SESSION_RECORDING_EVENTS) key = kafka_produce.call_args_list[0][1]["key"] @@ -1204,17 +1204,16 @@ class TestCapture(BaseTest): snapshot_source = 8 snapshot_type = 8 event_data = {"foo": "bar"} - with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0): - self._send_session_recording_event( - timestamp=timestamp, - snapshot_source=snapshot_source, - snapshot_type=snapshot_type, - session_id=session_id, - distinct_id=distinct_id, - window_id=window_id, - event_data=event_data, - ) - self.assertEqual(kafka_produce.call_count, 1) + self._send_session_recording_event( + timestamp=timestamp, + snapshot_source=snapshot_source, + snapshot_type=snapshot_type, + session_id=session_id, + distinct_id=distinct_id, + window_id=window_id, + event_data=event_data, + ) + self.assertEqual(kafka_produce.call_count, 2) self.assertEqual(kafka_produce.call_args_list[0][1]["topic"], KAFKA_SESSION_RECORDING_EVENTS) key = kafka_produce.call_args_list[0][1]["key"] self.assertEqual(key, session_id) @@ -1225,6 +1224,7 @@ class TestCapture(BaseTest): { "event": "$snapshot", "properties": { + "$snapshot_consumer": "v1", "$snapshot_data": { "chunk_count": 1, "chunk_id": "fake-uuid", @@ -1253,12 +1253,13 @@ class TestCapture(BaseTest): self._send_session_recording_event(event_data=large_data_array) topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list]) - assert topic_counter == Counter({KAFKA_SESSION_RECORDING_EVENTS: 3}) + assert topic_counter == Counter( + {KAFKA_SESSION_RECORDING_EVENTS: 3, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1} + ) @patch("posthog.kafka_client.client._KafkaProducer.produce") def test_recording_ingestion_can_write_to_blob_ingestion_topic_with_usual_size_limit(self, kafka_produce) -> None: with self.settings( - REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1, SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=512, ): self._send_session_recording_event(event_data=large_data_array) @@ -1272,7 +1273,6 @@ class TestCapture(BaseTest): @patch("posthog.kafka_client.client._KafkaProducer.produce") def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produce) -> None: with self.settings( - REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1, SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, ): self._send_session_recording_event(event_data=large_data_array) @@ -1292,7 +1292,6 @@ class TestCapture(BaseTest): KAFKA_SECURITY_PROTOCOL="SASL_SSL", SESSION_RECORDING_KAFKA_HOSTS=["another-server:9092", "a-fourth.server:9092"], SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL="SSL", - REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1, SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=1234, ): # avoid logs from being printed because the mock is None @@ -1323,7 +1322,6 @@ class TestCapture(BaseTest): with self.settings( KAFKA_HOSTS=["first.server:9092", "second.server:9092"], SESSION_RECORDING_KAFKA_HOSTS=["another-server:9092", "a-fourth.server:9092"], - REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1, ): default_kafka_producer_mock.return_value = KafkaProducer() session_recording_producer_factory_mock.return_value = sessionRecordingKafkaProducer() @@ -1341,27 +1339,6 @@ class TestCapture(BaseTest): assert data_sent_to_recording_kafka["event"] == "$snapshot_items" assert len(data_sent_to_recording_kafka["properties"]["$snapshot_items"]) == 1 - @patch("posthog.api.capture.sessionRecordingKafkaProducer") - @patch("posthog.api.capture.KafkaProducer") - @patch("posthog.kafka_client.client._KafkaProducer.produce") - def test_uses_does_not_produce_if_blob_ingestion_disabled( - self, - kafka_produce: MagicMock, - default_kafka_producer_mock: MagicMock, - session_recording_producer_mock: MagicMock, - ) -> None: - with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0): - default_kafka_producer_mock.return_value = KafkaProducer() - session_recording_producer_mock.side_effect = sessionRecordingKafkaProducer() - - data = "example" - self._send_session_recording_event(event_data=data) - - default_kafka_producer_mock.assert_called() - session_recording_producer_mock.assert_not_called() - assert len(kafka_produce.call_args_list) == 1 - assert json.loads(kafka_produce.call_args_list[0][1]["data"]["data"])["event"] == "$snapshot" - def test_get_distinct_id_non_json_properties(self) -> None: with self.assertRaises(ValueError): get_distinct_id({"properties": "str"}) @@ -1415,7 +1392,7 @@ class TestCapture(BaseTest): replace_limited_team_tokens(QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000}) replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000}) self._send_session_recording_event() - self.assertEqual(kafka_produce.call_count, 1) + self.assertEqual(kafka_produce.call_count, 2) @patch("posthog.kafka_client.client._KafkaProducer.produce") @pytest.mark.ee @@ -1440,11 +1417,11 @@ class TestCapture(BaseTest): with self.settings(QUOTA_LIMITING_ENABLED=True): _produce_events() - self.assertEqual(kafka_produce.call_count, 3) + self.assertEqual(kafka_produce.call_count, 4) replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000}) _produce_events() - self.assertEqual(kafka_produce.call_count, 1) # Only the recording event + self.assertEqual(kafka_produce.call_count, 2) # Only the recording event replace_limited_team_tokens( QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000} @@ -1457,7 +1434,7 @@ class TestCapture(BaseTest): ) replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() - 10000}) _produce_events() - self.assertEqual(kafka_produce.call_count, 3) # All events as limit-until timestamp is in the past + self.assertEqual(kafka_produce.call_count, 4) # All events as limit-until timestamp is in the past @patch("posthog.kafka_client.client._KafkaProducer.produce") def test_capture_historical_analytics_events(self, kafka_produce) -> None: diff --git a/posthog/settings/ingestion.py b/posthog/settings/ingestion.py index 3b554435d01..6f5664470c8 100644 --- a/posthog/settings/ingestion.py +++ b/posthog/settings/ingestion.py @@ -31,10 +31,10 @@ PARTITION_KEY_BUCKET_REPLENTISH_RATE = get_from_env( ) REPLAY_EVENT_MAX_SIZE = get_from_env("REPLAY_EVENT_MAX_SIZE", type_cast=int, default=1024 * 512) # 512kb -REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = get_from_env("REPLAY_BLOB_INGESTION_TRAFFIC_RATIO", type_cast=float, default=0.0) +REPLAY_EVENTS_NEW_CONSUMER_RATIO = get_from_env("REPLAY_EVENTS_NEW_CONSUMER_RATIO", type_cast=float, default=0.0) -if REPLAY_BLOB_INGESTION_TRAFFIC_RATIO > 1 or REPLAY_BLOB_INGESTION_TRAFFIC_RATIO < 0: +if REPLAY_EVENTS_NEW_CONSUMER_RATIO > 1 or REPLAY_EVENTS_NEW_CONSUMER_RATIO < 0: logger.critical( - "Environment variable REPLAY_BLOB_INGESTION_TRAFFIC_RATIO is not between 0 and 1. Setting to 0 to be safe." + "Environment variable REPLAY_EVENTS_NEW_CONSUMER_RATIO is not between 0 and 1. Setting to 0 to be safe." ) - REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = 0 + REPLAY_EVENTS_NEW_CONSUMER_RATIO = 0