From bcbe300c22f9a28718736db0bdd2a845cfd70bd3 Mon Sep 17 00:00:00 2001 From: Ben White Date: Mon, 26 Feb 2024 09:43:27 +0100 Subject: [PATCH] feat: Replay ingestion with EFS (#20487) --- .vscode/launch.json | 3 +- bin/start | 1 + docker-compose.hobby.yml | 2 + plugin-server/jest.config.js | 1 + plugin-server/src/capabilities.ts | 9 +- .../session-recording/process-event.ts | 29 +- .../services/session-manager-v3.ts | 477 ++++++++++++++++++ .../session-recordings-consumer-v3.ts | 458 +++++++++++++++++ plugin-server/src/main/pluginsServer.ts | 22 + plugin-server/src/types.ts | 113 +---- .../session-recording/process-event.test.ts | 70 --- .../services/session-manager-v3.test.ts | 251 +++++++++ .../session-recordings-consumer-v3.test.ts | 295 +++++++++++ .../session_recording_api.py | 19 +- posthog/settings/session_replay.py | 2 + 15 files changed, 1539 insertions(+), 213 deletions(-) create mode 100644 plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts create mode 100644 plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index b4206a25f00..36c386906f1 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -72,7 +72,8 @@ "DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog", "SKIP_SERVICE_VERSION_REQUIREMENTS": "1", "PRINT_SQL": "1", - "BILLING_SERVICE_URL": "https://billing.dev.posthog.dev" + "BILLING_SERVICE_URL": "https://billing.dev.posthog.dev", + "RECORDINGS_INGESTER_URL": "http://localhost:6738" }, "console": "integratedTerminal", "python": "${workspaceFolder}/env/bin/python", diff --git a/bin/start b/bin/start index 4cb46f4ee7a..5acde4c7c33 100755 --- a/bin/start +++ b/bin/start @@ -7,6 +7,7 @@ trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT export DEBUG=${DEBUG:-1} export SKIP_SERVICE_VERSION_REQUIREMENTS=1 export BILLING_SERVICE_URL=${BILLING_SERVICE_URL:-https://billing.dev.posthog.dev} +export RECORDINGS_INGESTER_URL=${RECORDINGS_INGESTER_URL:-http://localhost:6738} service_warning() { echo -e "\033[0;31m$1 isn't ready. You can run the stack with:\ndocker compose -f docker-compose.dev.yml up\nIf you have already ran that, just make sure that services are starting properly, and sit back.\nWaiting for $1 to start...\033[0m" diff --git a/docker-compose.hobby.yml b/docker-compose.hobby.yml index 059efadf8b9..aea3280216c 100644 --- a/docker-compose.hobby.yml +++ b/docker-compose.hobby.yml @@ -74,6 +74,8 @@ services: SENTRY_DSN: $SENTRY_DSN SITE_URL: https://$DOMAIN SECRET_KEY: $POSTHOG_SECRET + RECORDINGS_INGESTER_URL: http://plugins:6738 + plugins: extends: file: docker-compose.base.yml diff --git a/plugin-server/jest.config.js b/plugin-server/jest.config.js index ccc988e330f..e5dc2f08c4c 100644 --- a/plugin-server/jest.config.js +++ b/plugin-server/jest.config.js @@ -8,4 +8,5 @@ module.exports = { setupFilesAfterEnv: ['./jest.setup.fetch-mock.js'], testMatch: ['/tests/**/*.test.ts'], testTimeout: 60000, + modulePathIgnorePatterns: ['/.tmp/'], } diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index c9b444067ee..e2543d66adb 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -1,5 +1,5 @@ import { PluginServerCapabilities, PluginServerMode, PluginsServerConfig, stringToPluginServerMode } from './types' -import { isTestEnv } from './utils/env-utils' +import { isDevEnv, isTestEnv } from './utils/env-utils' export function getPluginServerCapabilities(config: PluginsServerConfig): PluginServerCapabilities { const mode: PluginServerMode | null = config.PLUGIN_SERVER_MODE @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processAsyncOnEventHandlers: true, processAsyncWebhooksHandlers: true, sessionRecordingBlobIngestion: true, + sessionRecordingV3Ingestion: isDevEnv(), personOverrides: true, appManagementSingleton: true, preflightSchedules: true, @@ -55,6 +56,12 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin sessionRecordingBlobIngestion: true, ...sharedCapabilities, } + + case PluginServerMode.recordings_ingestion_v3: + return { + sessionRecordingV3Ingestion: true, + ...sharedCapabilities, + } case PluginServerMode.async_onevent: return { processAsyncOnEventHandlers: true, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/process-event.ts b/plugin-server/src/main/ingestion-queues/session-recording/process-event.ts index 7f5b7b3e022..09e80d42037 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/process-event.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/process-event.ts @@ -1,14 +1,7 @@ -import { Properties } from '@posthog/plugin-scaffold' import { captureException } from '@sentry/node' import { DateTime } from 'luxon' -import { - ClickHouseTimestamp, - PerformanceEventReverseMapping, - RawPerformanceEvent, - RRWebEvent, - TimestampFormat, -} from '../../../types' +import { ClickHouseTimestamp, RRWebEvent, TimestampFormat } from '../../../types' import { status } from '../../../utils/status' import { castTimestampOrNow } from '../../../utils/utils' import { activeMilliseconds } from './snapshot-segmenter' @@ -271,23 +264,3 @@ export const createSessionReplayEvent = ( return data } - -export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) { - const data: Partial = { - uuid, - team_id: team_id, - distinct_id: distinct_id, - session_id: properties['$session_id'], - window_id: properties['$window_id'], - pageview_id: properties['$pageview_id'], - current_url: properties['$current_url'], - } - - Object.entries(PerformanceEventReverseMapping).forEach(([key, value]) => { - if (key in properties) { - data[value] = properties[key] - } - }) - - return data -} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts new file mode 100644 index 00000000000..deb9a9d1fe6 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts @@ -0,0 +1,477 @@ +import { Upload } from '@aws-sdk/lib-storage' +import { captureException, captureMessage } from '@sentry/node' +import { createReadStream, createWriteStream, WriteStream } from 'fs' +import { mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises' +import path from 'path' +import { Counter, Histogram } from 'prom-client' +import { PassThrough } from 'stream' +import { pipeline } from 'stream/promises' +import * as zlib from 'zlib' + +import { PluginsServerConfig } from '../../../../types' +import { status } from '../../../../utils/status' +import { asyncTimeoutGuard } from '../../../../utils/timing' +import { ObjectStorage } from '../../../services/object_storage' +import { IncomingRecordingMessage } from '../types' +import { convertToPersistedMessage, maxDefined, minDefined, now } from '../utils' + +const BUCKETS_LINES_WRITTEN = [0, 10, 50, 100, 500, 1000, 2000, 5000, 10000, Infinity] +export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity] +const S3_UPLOAD_WARN_TIME_SECONDS = 2 * 60 * 1000 + +// NOTE: To remove once released +const metricPrefix = 'v3_' + +export const FILE_EXTENSION = '.jsonl' +export const BUFFER_FILE_NAME = `buffer${FILE_EXTENSION}` +export const FLUSH_FILE_EXTENSION = `.flush${FILE_EXTENSION}` + +const counterS3FilesWritten = new Counter({ + name: metricPrefix + 'recording_s3_files_written', + help: 'A single file flushed to S3', + labelNames: ['flushReason'], +}) + +const counterS3WriteErrored = new Counter({ + name: metricPrefix + 'recording_s3_write_errored', + help: 'Indicates that we failed to flush to S3 without recovering', +}) + +const histogramS3LinesWritten = new Histogram({ + name: metricPrefix + 'recording_s3_lines_written_histogram', + help: 'The number of lines in a file we send to s3', + buckets: BUCKETS_LINES_WRITTEN, +}) + +const histogramS3KbWritten = new Histogram({ + name: metricPrefix + 'recording_blob_ingestion_s3_kb_written', + help: 'The uncompressed size of file we send to S3', + buckets: BUCKETS_KB_WRITTEN, +}) + +const histogramSessionAgeSeconds = new Histogram({ + name: metricPrefix + 'recording_blob_ingestion_session_age_seconds', + help: 'The age of current sessions in seconds', + buckets: [0, 60, 60 * 2, 60 * 5, 60 * 8, 60 * 10, 60 * 12, 60 * 15, 60 * 20, Infinity], +}) + +const histogramSessionSizeKb = new Histogram({ + name: metricPrefix + 'recording_blob_ingestion_session_size_kb', + help: 'The size of current sessions in kb', + buckets: BUCKETS_KB_WRITTEN, +}) + +const histogramFlushTimeSeconds = new Histogram({ + name: metricPrefix + 'recording_blob_ingestion_session_flush_time_seconds', + help: 'The time taken to flush a session in seconds', + buckets: [0, 2, 5, 10, 20, 30, 60, 120, 180, 300, Infinity], +}) + +const histogramSessionSize = new Histogram({ + name: metricPrefix + 'recording_blob_ingestion_session_lines', + help: 'The size of sessions in numbers of lines', + buckets: BUCKETS_LINES_WRITTEN, +}) + +const writeStreamBlocked = new Counter({ + name: metricPrefix + 'recording_blob_ingestion_write_stream_blocked', + help: 'Number of times we get blocked by the stream backpressure', +}) + +export type SessionManagerBufferContext = { + sizeEstimate: number + count: number + eventsRange: { + firstTimestamp: number + lastTimestamp: number + } | null + createdAt: number +} + +export type SessionBuffer = { + context: SessionManagerBufferContext + fileStream: WriteStream +} + +// Context that is updated and persisted to disk so must be serializable +export type SessionManagerContext = { + dir: string + sessionId: string + teamId: number + partition: number +} + +export class SessionManagerV3 { + buffer?: SessionBuffer + flushPromise?: Promise + destroying = false + inProgressUpload: Upload | null = null + flushJitterMultiplier: number + + constructor( + public readonly serverConfig: PluginsServerConfig, + public readonly s3Client: ObjectStorage['s3'], + public readonly context: SessionManagerContext + ) { + // We add a jitter multiplier to the buffer age so that we don't have all sessions flush at the same time + this.flushJitterMultiplier = 1 - Math.random() * serverConfig.SESSION_RECORDING_BUFFER_AGE_JITTER + } + + private file(name: string): string { + return path.join(this.context.dir, name) + } + + public static async create( + serverConfig: PluginsServerConfig, + s3Client: ObjectStorage['s3'], + context: SessionManagerContext + ): Promise { + const manager = new SessionManagerV3(serverConfig, s3Client, context) + await mkdir(context.dir, { recursive: true }) + + try { + const fileExists = await stat(manager.file('metadata.json')).then( + () => true, + () => false + ) + if (fileExists) { + const bufferMetadata: SessionManagerBufferContext = JSON.parse( + await readFile(manager.file('metadata.json'), 'utf-8') + ) + manager.buffer = { + context: bufferMetadata, + fileStream: manager.createFileStreamFor(path.join(context.dir, BUFFER_FILE_NAME)), + } + } + } catch (error) { + // Indicates no buffer metadata file or it's corrupted + status.error('๐Ÿงจ', '[session-manager] failed to read buffer metadata', { + ...context, + error, + }) + } + + status.info('๐Ÿ“ฆ', '[session-manager] started new manager', { + ...manager.context, + ...(manager.buffer?.context ?? {}), + }) + return manager + } + + private async syncMetadata(): Promise { + if (this.buffer) { + await writeFile(this.file('metadata.json'), JSON.stringify(this.buffer?.context), 'utf-8') + } else { + await unlink(this.file('metadata.json')) + } + } + + private async getFlushFiles(): Promise { + return (await readdir(this.context.dir)).filter((file) => file.endsWith(FLUSH_FILE_EXTENSION)) + } + + private captureException(error: Error, extra: Record = {}): void { + captureException(error, { + extra: { ...this.context, ...extra }, + tags: { teamId: this.context.teamId, sessionId: this.context.sessionId }, + }) + } + + private captureMessage(message: string, extra: Record = {}): void { + const context = this.context + captureMessage(message, { + extra: { ...context, ...extra }, + tags: { teamId: context.teamId, sessionId: context.sessionId }, + }) + } + + public async add(message: IncomingRecordingMessage): Promise { + if (this.destroying) { + return + } + + try { + const buffer = this.getOrCreateBuffer() + const messageData = convertToPersistedMessage(message) + const start = message.events.at(0)?.timestamp + const end = message.events.at(-1)?.timestamp ?? start + + if (!start || !end) { + captureMessage("[session-manager]: can't set events range from message without events summary", { + extra: { message }, + }) + return + } + + buffer.context.eventsRange = { + firstTimestamp: minDefined(start, buffer.context.eventsRange?.firstTimestamp) ?? start, + lastTimestamp: maxDefined(end, buffer.context.eventsRange?.lastTimestamp) ?? end, + } + + const content = JSON.stringify(messageData) + '\n' + buffer.context.count += 1 + buffer.context.sizeEstimate += content.length + + if (!buffer.fileStream.write(content, 'utf-8')) { + writeStreamBlocked.inc() + await new Promise((r) => buffer.fileStream.once('drain', r)) + } + + await this.syncMetadata() + } catch (error) { + this.captureException(error, { message }) + throw error + } + } + + public async isEmpty(): Promise { + return !this.buffer?.context.count && !(await this.getFlushFiles()).length + } + + public async flush(force = false): Promise { + if (this.destroying) { + return + } + + if (!force) { + await this.maybeFlushCurrentBuffer() + } else { + // This is mostly used by tests + await this.markCurrentBufferForFlush('rebalance') + } + + await this.flushFiles() + } + + private async maybeFlushCurrentBuffer(): Promise { + if (!this.buffer) { + return + } + + if (this.buffer.context.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) { + return this.markCurrentBufferForFlush('buffer_size') + } + + const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000 + const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier + + const logContext: Record = { + ...this.context, + flushThresholdMs, + flushThresholdJitteredMs, + } + + if (!this.buffer.context.count) { + status.warn('๐Ÿšฝ', `[session-manager] buffer has no items yet`, { logContext }) + return + } + + const bufferAgeInMemoryMs = now() - this.buffer.context.createdAt + + // check the in-memory age against a larger value than the flush threshold, + // otherwise we'll flap between reasons for flushing when close to real-time processing + const isSessionAgeOverThreshold = bufferAgeInMemoryMs >= flushThresholdJitteredMs + + logContext['bufferAgeInMemoryMs'] = bufferAgeInMemoryMs + logContext['isSessionAgeOverThreshold'] = isSessionAgeOverThreshold + + histogramSessionAgeSeconds.observe(bufferAgeInMemoryMs / 1000) + histogramSessionSize.observe(this.buffer.context.count) + histogramSessionSizeKb.observe(this.buffer.context.sizeEstimate / 1024) + + if (isSessionAgeOverThreshold) { + return this.markCurrentBufferForFlush('buffer_age') + } + } + + private async markCurrentBufferForFlush(reason: 'buffer_size' | 'buffer_age' | 'rebalance'): Promise { + const buffer = this.buffer + if (!buffer) { + // TODO: maybe error properly here? + return + } + + if (!buffer.context.eventsRange || !buffer.context.count) { + // Indicates some issue with the buffer so we can close out + this.buffer = undefined + return + } + + // ADD FLUSH METRICS HERE + + const { firstTimestamp, lastTimestamp } = buffer.context.eventsRange + const fileName = `${firstTimestamp}-${lastTimestamp}${FLUSH_FILE_EXTENSION}` + + counterS3FilesWritten.labels(reason).inc(1) + histogramS3LinesWritten.observe(buffer.context.count) + histogramS3KbWritten.observe(buffer.context.sizeEstimate / 1024) + + // NOTE: We simplify everything by keeping the files as the same name for S3 + await new Promise((resolve) => buffer.fileStream.end(resolve)) + await rename(this.file(BUFFER_FILE_NAME), this.file(fileName)) + this.buffer = undefined + + await this.syncMetadata() + } + + private async flushFiles(): Promise { + // We read all files marked for flushing and write them to S3 + const filesToFlush = await this.getFlushFiles() + await Promise.all(filesToFlush.map((file) => this.flushFile(file))) + } + + private async flushFile(filename: string): Promise { + status.info('๐Ÿšฝ', '[session-manager] flushing file to S3', { + filename, + ...this.context, + }) + if (this.destroying) { + status.warn('๐Ÿšฝ', '[session-manager] flush called but we are in a destroying state', { + ...this.context, + }) + return + } + + const file = this.file(filename) + + const deleteFile = async () => { + await stat(file) + await unlink(file) + } + + const endFlushTimer = histogramFlushTimeSeconds.startTimer() + + try { + const targetFileName = filename.replace(FLUSH_FILE_EXTENSION, FILE_EXTENSION) + const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.context.teamId}/session_id/${this.context.sessionId}` + const dataKey = `${baseKey}/data/${targetFileName}` + + const readStream = createReadStream(file) + const uploadStream = new PassThrough() + + // The compressed file + pipeline(readStream, zlib.createGzip(), uploadStream).catch((error) => { + // TODO: If this actually happens we probably want to destroy the buffer as we will be stuck... + status.error('๐Ÿงจ', '[session-manager] writestream errored', { + ...this.context, + error, + }) + this.captureException(error) + }) + + readStream.on('error', (err) => { + // TODO: What should we do here? + status.error('๐Ÿงจ', '[session-manager] readstream errored', { + ...this.context, + error: err, + }) + + this.captureException(err) + }) + + const inProgressUpload = (this.inProgressUpload = new Upload({ + client: this.s3Client, + params: { + Bucket: this.serverConfig.OBJECT_STORAGE_BUCKET, + Key: dataKey, + ContentEncoding: 'gzip', + ContentType: 'application/json', + Body: uploadStream, + }, + })) + + await asyncTimeoutGuard( + { + message: 'session-manager.flush uploading file to S3 delayed.', + timeout: S3_UPLOAD_WARN_TIME_SECONDS, + }, + async () => { + await inProgressUpload.done() + } + ) + } catch (error: any) { + // TRICKY: error can for some reason sometimes be undefined... + error = error || new Error('Unknown Error') + + if (error.name === 'AbortError' && this.destroying) { + // abort of inProgressUpload while destroying is expected + return + } + + await this.inProgressUpload?.abort() + + // TODO: If we fail to write to S3 we should be do something about it + status.error('๐Ÿงจ', '[session-manager] failed writing session recording blob to S3', { + errorMessage: `${error.name || 'Unknown Error Type'}: ${error.message}`, + error, + ...this.context, + }) + this.captureException(error) + counterS3WriteErrored.inc() + + throw error + } finally { + endFlushTimer() + await deleteFile() + } + } + + private getOrCreateBuffer(): SessionBuffer { + if (!this.buffer) { + try { + const context: SessionManagerBufferContext = { + sizeEstimate: 0, + count: 0, + eventsRange: null, + createdAt: now(), + } + const buffer: SessionBuffer = { + context, + fileStream: this.createFileStreamFor(this.file(BUFFER_FILE_NAME)), + } + + this.buffer = buffer + } catch (error) { + this.captureException(error) + throw error + } + } + + return this.buffer as SessionBuffer + } + + protected createFileStreamFor(file: string): WriteStream { + return createWriteStream(file, { + // Opens in append mode in case it already exists + flags: 'a', + encoding: 'utf-8', + }) + } + + public async stop(): Promise { + this.destroying = true + if (this.inProgressUpload !== null) { + await this.inProgressUpload.abort().catch((error) => { + status.error('๐Ÿงจ', '[session-manager][realtime] failed to abort in progress upload', { + ...this.context, + error, + }) + this.captureException(error) + }) + this.inProgressUpload = null + } + + const buffer = this.buffer + if (buffer) { + await new Promise((resolve) => buffer.fileStream.end(resolve)) + } + + if (await this.isEmpty()) { + status.info('๐Ÿงจ', '[session-manager] removing empty session directory', { + ...this.context, + }) + + await rmdir(this.context.dir, { recursive: true }) + } + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts new file mode 100644 index 00000000000..58a741487e5 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -0,0 +1,458 @@ +import { captureException } from '@sentry/node' +import { createReadStream } from 'fs' +import { readdir, stat } from 'fs/promises' +import { features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' +import path from 'path' +import { Counter, Gauge, Histogram } from 'prom-client' + +import { sessionRecordingConsumerConfig } from '../../../config/config' +import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' +import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' +import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { PluginsServerConfig, TeamId } from '../../../types' +import { BackgroundRefresher } from '../../../utils/background-refresher' +import { PostgresRouter } from '../../../utils/db/postgres' +import { status } from '../../../utils/status' +import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' +import { expressApp } from '../../services/http-server' +import { ObjectStorage } from '../../services/object_storage' +import { runInstrumentedFunction } from '../../utils' +import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' +import { BUCKETS_KB_WRITTEN, BUFFER_FILE_NAME, SessionManagerV3 } from './services/session-manager-v3' +import { IncomingRecordingMessage } from './types' +import { parseKafkaMessage } from './utils' + +// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals +require('@sentry/tracing') + +// WARNING: Do not change this - it will essentially reset the consumer +const KAFKA_CONSUMER_GROUP_ID = 'session-replay-ingester' +const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 + +// NOTE: To remove once released +const metricPrefix = 'v3_' + +const gaugeSessionsHandled = new Gauge({ + name: metricPrefix + 'recording_blob_ingestion_session_manager_count', + help: 'A gauge of the number of sessions being handled by this blob ingestion consumer', +}) + +const histogramKafkaBatchSize = new Histogram({ + name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size', + help: 'The size of the batches we are receiving from Kafka', + buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 600, Infinity], +}) + +const histogramKafkaBatchSizeKb = new Histogram({ + name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size_kb', + help: 'The size in kb of the batches we are receiving from Kafka', + buckets: BUCKETS_KB_WRITTEN, +}) + +const counterKafkaMessageReceived = new Counter({ + name: metricPrefix + 'recording_blob_ingestion_kafka_message_received', + help: 'The number of messages we have received from Kafka', + labelNames: ['partition'], +}) + +export interface TeamIDWithConfig { + teamId: TeamId | null + consoleLogIngestionEnabled: boolean +} + +/** + * The SessionRecordingIngesterV3 + * relies on EFS network storage to avoid the need to delay kafka commits and instead uses the disk + * as the persistent volume for both blob data and the metadata around ingestion. + */ +export class SessionRecordingIngesterV3 { + // redisPool: RedisPool + sessions: Record = {} + // sessionHighWaterMarker: OffsetHighWaterMarker + // persistentHighWaterMarker: OffsetHighWaterMarker + // realtimeManager: RealtimeManager + // replayEventsIngester: ReplayEventsIngester + // consoleLogsIngester: ConsoleLogsIngester + batchConsumer?: BatchConsumer + // partitionMetrics: Record = {} + teamsRefresher: BackgroundRefresher> + // latestOffsetsRefresher: BackgroundRefresher> + config: PluginsServerConfig + topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + // totalNumPartitions = 0 + + private promises: Set> = new Set() + // if ingestion is lagging on a single partition it is often hard to identify _why_, + // this allows us to output more information for that partition + private debugPartition: number | undefined = undefined + + constructor( + globalServerConfig: PluginsServerConfig, + private postgres: PostgresRouter, + private objectStorage: ObjectStorage + ) { + this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION + ? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION) + : undefined + + // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis + // We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. + this.config = sessionRecordingConsumerConfig(globalServerConfig) + // this.redisPool = createRedisPool(this.config) + + // NOTE: This is the only place where we need to use the shared server config + // TODO: Uncomment when we swap to using this service as the ingester for it + // this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.persistentHighWaterMarker) + // this.consoleLogsIngester = new ConsoleLogsIngester(globalServerConfig, this.persistentHighWaterMarker) + + this.teamsRefresher = new BackgroundRefresher(async () => { + try { + status.info('๐Ÿ”', 'session-replay-ingestion - refreshing teams in the background') + return await fetchTeamTokensWithRecordings(this.postgres) + } catch (e) { + status.error('๐Ÿ”ฅ', 'session-replay-ingestion - failed to refresh teams in the background', e) + captureException(e) + throw e + } + }) + } + + private get rootDir() { + return path.join(this.config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-recordings') + } + + private dirForSession(partition: number, teamId: number, sessionId: string): string { + return path.join(this.rootDir, `${partition}`, `${teamId}__${sessionId}`) + } + + private get connectedBatchConsumer(): KafkaConsumer | undefined { + // Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors + const consumer = this.batchConsumer?.consumer + return consumer && consumer.isConnected() ? consumer : undefined + } + + private get assignedTopicPartitions(): TopicPartition[] { + return this.connectedBatchConsumer?.assignments() ?? [] + } + + private scheduleWork(promise: Promise): Promise { + /** + * Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished. + * That way when shutting down we can wait for all promises to finish before exiting. + */ + this.promises.add(promise) + promise.finally(() => this.promises.delete(promise)) + + return promise + } + + public async consume(event: IncomingRecordingMessage): Promise { + const { team_id, session_id } = event + const key = `${team_id}__${session_id}` + + const { offset, partition } = event.metadata + if (this.debugPartition === partition) { + status.info('๐Ÿ”', '[session-replay-ingestion] - [PARTITION DEBUG] - consuming event', { + team_id, + session_id, + partition, + offset, + }) + } + + if (!this.sessions[key]) { + const { partition } = event.metadata + + this.sessions[key] = await SessionManagerV3.create(this.config, this.objectStorage.s3, { + teamId: team_id, + sessionId: session_id, + dir: this.dirForSession(partition, team_id, session_id), + partition, + }) + } + + await this.sessions[key]?.add(event) + } + + public async handleEachBatch(messages: Message[]): Promise { + status.info('๐Ÿ”', `session-replay-ingestion - handling batch`, { + size: messages.length, + partitionsInBatch: [...new Set(messages.map((x) => x.partition))], + assignedPartitions: this.assignedTopicPartitions.map((x) => x.partition), + sessionsHandled: Object.keys(this.sessions).length, + }) + + // TODO: For all assigned partitions, load up any sessions on disk that we don't already have in memory + // TODO: Add a timer or something to fire this "handleEachBatch" with an empty batch for quite partitions + + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch`, + logExecutionTime: true, + func: async () => { + histogramKafkaBatchSize.observe(messages.length) + histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) + + const recordingMessages: IncomingRecordingMessage[] = [] + + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`, + func: async () => { + for (const message of messages) { + counterKafkaMessageReceived.inc({ partition: message.partition }) + + const recordingMessage = await parseKafkaMessage(message, (token) => + this.teamsRefresher.get().then((teams) => ({ + teamId: teams[token]?.teamId || null, + consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, + })) + ) + + if (recordingMessage) { + recordingMessages.push(recordingMessage) + } + } + }, + }) + + // await this.reportPartitionMetrics() + + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.ensureSessionsAreLoaded`, + func: async () => { + await this.syncSessionsWithDisk() + }, + }) + + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.consumeBatch`, + func: async () => { + for (const message of recordingMessages) { + await this.consume(message) + } + }, + }) + + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`, + func: async () => { + await this.flushAllReadySessions() + }, + }) + + // await runInstrumentedFunction({ + // statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`, + // func: async () => { + // await this.replayEventsIngester.consumeBatch(recordingMessages) + // }, + // }) + + // await runInstrumentedFunction({ + // statsKey: `recordingingester.handleEachBatch.consumeConsoleLogEvents`, + // func: async () => { + // await this.consoleLogsIngester.consumeBatch(recordingMessages) + // }, + // }) + }, + }) + } + + public async start(): Promise { + status.info('๐Ÿ”', 'session-replay-ingestion - starting session recordings blob consumer', { + librdKafkaVersion: librdkafkaVersion, + kafkaCapabilities: features, + }) + + this.setupHttpRoutes() + + // Load teams into memory + await this.teamsRefresher.refresh() + // await this.replayEventsIngester.start() + // await this.consoleLogsIngester.start() + + const connectionConfig = createRdConnectionConfigFromEnvVars(this.config) + + // Create a node-rdkafka consumer that fetches batches of messages, runs + // eachBatchWithContext, then commits offsets for the batch. + + this.batchConsumer = await startBatchConsumer({ + connectionConfig, + groupId: KAFKA_CONSUMER_GROUP_ID, + topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + autoCommit: true, // NOTE: This is the crucial difference between this and the other consumer + sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, + maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, + // the largest size of a message that can be fetched by the consumer. + // the largest size our MSK cluster allows is 20MB + // we only use 9 or 10MB but there's no reason to limit this ๐Ÿคท๏ธ + consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES, + consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, + // our messages are very big, so we don't want to buffer too many + queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE, + consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS, + consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, + fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE, + batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, + topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS, + eachBatch: async (messages) => { + return await this.scheduleWork(this.handleEachBatch(messages)) + }, + }) + + addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) + + this.batchConsumer.consumer.on('disconnected', async (err) => { + // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect + // we need to listen to disconnect and make sure we're stopped + status.info('๐Ÿ”', 'session-replay-ingestion batch consumer disconnected, cleaning up', { err }) + await this.stop() + }) + } + + public async stop(): Promise[]> { + status.info('๐Ÿ”', 'session-replay-ingestion - stopping') + + // NOTE: We have to get the partitions before we stop the consumer as it throws if disconnected + // const assignedPartitions = this.assignedTopicPartitions + // Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive + await this.batchConsumer?.stop() + + void this.scheduleWork( + Promise.allSettled( + Object.entries(this.sessions).map(([key, sessionManager]) => this.destroySession(key, sessionManager)) + ) + ) + + // void this.scheduleWork(this.replayEventsIngester.stop()) + // void this.scheduleWork(this.consoleLogsIngester.stop()) + + const promiseResults = await Promise.allSettled(this.promises) + + // Finally we clear up redis once we are sure everything else has been handled + // await this.redisPool.drain() + // await this.redisPool.clear() + + status.info('๐Ÿ‘', 'session-replay-ingestion - stopped!') + + return promiseResults + } + + public isHealthy() { + // TODO: Maybe extend this to check if we are shutting down so we don't get killed early. + return this.batchConsumer?.isHealthy() + } + + async flushAllReadySessions(): Promise { + const promises: Promise[] = [] + const assignedPartitions = this.assignedTopicPartitions.map((x) => x.partition) + + for (const [key, sessionManager] of Object.entries(this.sessions)) { + if (!assignedPartitions.includes(sessionManager.context.partition)) { + promises.push(this.destroySession(key, sessionManager)) + continue + } + + const flushPromise = sessionManager + .flush() + .catch((err) => { + status.error( + '๐Ÿšฝ', + 'session-replay-ingestion - failed trying to flush on idle session: ' + + sessionManager.context.sessionId, + { + err, + session_id: sessionManager.context.sessionId, + } + ) + captureException(err, { tags: { session_id: sessionManager.context.sessionId } }) + }) + .then(async () => { + // If the SessionManager is done (flushed and with no more queued events) then we remove it to free up memory + if (await sessionManager.isEmpty()) { + await this.destroySession(key, sessionManager) + } + }) + promises.push(flushPromise) + } + await Promise.allSettled(promises) + gaugeSessionsHandled.set(Object.keys(this.sessions).length) + } + + private async syncSessionsWithDisk(): Promise { + // As we may get assigned and reassigned partitions, we want to make sure that we have all sessions loaded into memory + await Promise.all( + this.assignedTopicPartitions.map(async ({ partition }) => { + const keys = await readdir(path.join(this.rootDir, `${partition}`)).catch(() => { + // This happens if there are no files on disk for that partition yet + return [] + }) + + // TODO: Below regex is a little crude. We should fix it + await Promise.all( + keys + .filter((x) => /\d+__[a-zA-Z0-9\-]+/.test(x)) + .map(async (key) => { + // TODO: Ensure sessionId can only be a uuid + const [teamId, sessionId] = key.split('__') + + if (!this.sessions[key]) { + this.sessions[key] = await SessionManagerV3.create(this.config, this.objectStorage.s3, { + teamId: parseInt(teamId), + sessionId, + dir: this.dirForSession(partition, parseInt(teamId), sessionId), + partition, + }) + } + }) + ) + }) + ) + } + + private async destroySession(key: string, sessionManager: SessionManagerV3): Promise { + delete this.sessions[key] + await sessionManager.stop() + } + + private setupHttpRoutes() { + // Mimic the app sever's endpoint + expressApp.get('/api/projects/:projectId/session_recordings/:sessionId/snapshots', async (req, res) => { + const startTime = Date.now() + res.on('finish', function () { + status.info('โšก๏ธ', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`) + }) + + // validate that projectId is a number and sessionId is UUID like + const projectId = parseInt(req.params.projectId) + if (isNaN(projectId)) { + res.sendStatus(404) + return + } + + const sessionId = req.params.sessionId + if (!/^[0-9a-f-]+$/.test(sessionId)) { + res.sendStatus(404) + return + } + + status.info('๐Ÿ”', 'session-replay-ingestion - fetching session', { projectId, sessionId }) + + // We don't know the partition upfront so we have to recursively check all partitions + const partitions = await readdir(this.rootDir).catch(() => []) + + for (const partition of partitions) { + const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId) + const exists = await stat(sessionDir).catch(() => null) + + if (!exists) { + continue + } + + const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME)) + fileStream.pipe(res) + return + } + + res.sendStatus(404) + }) + } +} diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 97f13992909..f3a4362db6f 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -39,6 +39,7 @@ import { } from './ingestion-queues/on-event-handler-consumer' import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' import { SessionRecordingIngester } from './ingestion-queues/session-recording/session-recordings-consumer' +import { SessionRecordingIngesterV3 } from './ingestion-queues/session-recording/session-recordings-consumer-v3' import { setupCommonRoutes } from './services/http-server' import { getObjectStorage } from './services/object_storage' @@ -451,6 +452,27 @@ export async function startPluginsServer( } } + if (capabilities.sessionRecordingV3Ingestion) { + const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) + const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) + + if (!s3) { + throw new Error("Can't start session recording ingestion without object storage") + } + // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas + const ingester = new SessionRecordingIngesterV3(serverConfig, postgres, s3) + await ingester.start() + + const batchConsumer = ingester.batchConsumer + + if (batchConsumer) { + stopSessionRecordingBlobConsumer = () => ingester.stop() + shutdownOnConsumerExit(batchConsumer) + healthChecks['session-recordings-ingestion'] = () => ingester.isHealthy() ?? false + } + } + if (capabilities.personOverrides) { const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index be9b929643f..2445567668d 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -77,6 +77,7 @@ export enum PluginServerMode { scheduler = 'scheduler', analytics_ingestion = 'analytics-ingestion', recordings_blob_ingestion = 'recordings-blob-ingestion', + recordings_ingestion_v3 = 'recordings-ingestion-v3', person_overrides = 'person-overrides', } @@ -298,6 +299,7 @@ export interface PluginServerCapabilities { processAsyncOnEventHandlers?: boolean processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestion?: boolean + sessionRecordingV3Ingestion?: boolean personOverrides?: boolean appManagementSingleton?: boolean preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud @@ -950,117 +952,6 @@ export interface RawSessionReplayEvent { /* TODO what columns do we need */ } -export interface RawPerformanceEvent { - uuid: string - team_id: number - distinct_id: string - session_id: string - window_id: string - pageview_id: string - current_url: string - - // BASE_EVENT_COLUMNS - time_origin: number - timestamp: string - entry_type: string - name: string - - // RESOURCE_EVENT_COLUMNS - start_time: number - redirect_start: number - redirect_end: number - worker_start: number - fetch_start: number - domain_lookup_start: number - domain_lookup_end: number - connect_start: number - secure_connection_start: number - connect_end: number - request_start: number - response_start: number - response_end: number - decoded_body_size: number - encoded_body_size: number - duration: number - - initiator_type: string - next_hop_protocol: string - render_blocking_status: string - response_status: number - transfer_size: number - - // LARGEST_CONTENTFUL_PAINT_EVENT_COLUMNS - largest_contentful_paint_element: string - largest_contentful_paint_render_time: number - largest_contentful_paint_load_time: number - largest_contentful_paint_size: number - largest_contentful_paint_id: string - largest_contentful_paint_url: string - - // NAVIGATION_EVENT_COLUMNS - dom_complete: number - dom_content_loaded_event: number - dom_interactive: number - load_event_end: number - load_event_start: number - redirect_count: number - navigation_type: string - unload_event_end: number - unload_event_start: number -} - -export const PerformanceEventReverseMapping: { [key: number]: keyof RawPerformanceEvent } = { - // BASE_PERFORMANCE_EVENT_COLUMNS - 0: 'entry_type', - 1: 'time_origin', - 2: 'name', - - // RESOURCE_EVENT_COLUMNS - 3: 'start_time', - 4: 'redirect_start', - 5: 'redirect_end', - 6: 'worker_start', - 7: 'fetch_start', - 8: 'domain_lookup_start', - 9: 'domain_lookup_end', - 10: 'connect_start', - 11: 'secure_connection_start', - 12: 'connect_end', - 13: 'request_start', - 14: 'response_start', - 15: 'response_end', - 16: 'decoded_body_size', - 17: 'encoded_body_size', - 18: 'initiator_type', - 19: 'next_hop_protocol', - 20: 'render_blocking_status', - 21: 'response_status', - 22: 'transfer_size', - - // LARGEST_CONTENTFUL_PAINT_EVENT_COLUMNS - 23: 'largest_contentful_paint_element', - 24: 'largest_contentful_paint_render_time', - 25: 'largest_contentful_paint_load_time', - 26: 'largest_contentful_paint_size', - 27: 'largest_contentful_paint_id', - 28: 'largest_contentful_paint_url', - - // NAVIGATION_EVENT_COLUMNS - 29: 'dom_complete', - 30: 'dom_content_loaded_event', - 31: 'dom_interactive', - 32: 'load_event_end', - 33: 'load_event_start', - 34: 'redirect_count', - 35: 'navigation_type', - 36: 'unload_event_end', - 37: 'unload_event_start', - - // Added after v1 - 39: 'duration', - 40: 'timestamp', -} - export enum TimestampFormat { ClickHouseSecondPrecision = 'clickhouse-second-precision', ClickHouse = 'clickhouse', diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/process-event.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/process-event.test.ts index 844d077be96..e59bcb58847 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/process-event.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/process-event.test.ts @@ -2,7 +2,6 @@ import { DateTime } from 'luxon' import { ConsoleLogEntry, - createPerformanceEvent, createSessionReplayEvent, gatherConsoleLogEvents, getTimestampsFrom, @@ -414,73 +413,4 @@ describe('session recording process event', () => { } satisfies ConsoleLogEntry, ]) }) - - it('performance event stored as performance_event', () => { - const data = createPerformanceEvent('some-id', 12345, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', { - // Taken from a real event from the JS - '0': 'resource', - '1': 1671723295836, - '2': 'http://localhost:8000/api/projects/1/session_recordings', - '3': 10737.89999999106, - '4': 0, - '5': 0, - '6': 0, - '7': 10737.89999999106, - '8': 10737.89999999106, - '9': 10737.89999999106, - '10': 10737.89999999106, - '11': 0, - '12': 10737.89999999106, - '13': 10745.09999999404, - '14': 11121.70000000298, - '15': 11122.20000000298, - '16': 73374, - '17': 1767, - '18': 'fetch', - '19': 'http/1.1', - '20': 'non-blocking', - '22': 2067, - '39': 384.30000001192093, - '40': 1671723306573, - token: 'phc_234', - $session_id: '1853a793ad26c1-0eea05631cbeff-17525635-384000-1853a793ad31dd2', - $window_id: '1853a793ad424a5-017f7473b057f1-17525635-384000-1853a793ad524dc', - distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', - $current_url: 'http://localhost:8000/recordings/recent', - }) - - expect(data).toEqual({ - connect_end: 10737.89999999106, - connect_start: 10737.89999999106, - current_url: 'http://localhost:8000/recordings/recent', - decoded_body_size: 73374, - distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', - domain_lookup_end: 10737.89999999106, - domain_lookup_start: 10737.89999999106, - duration: 384.30000001192093, - encoded_body_size: 1767, - entry_type: 'resource', - fetch_start: 10737.89999999106, - initiator_type: 'fetch', - name: 'http://localhost:8000/api/projects/1/session_recordings', - next_hop_protocol: 'http/1.1', - pageview_id: undefined, - redirect_end: 0, - redirect_start: 0, - render_blocking_status: 'non-blocking', - request_start: 10745.09999999404, - response_end: 11122.20000000298, - response_start: 11121.70000000298, - secure_connection_start: 0, - session_id: '1853a793ad26c1-0eea05631cbeff-17525635-384000-1853a793ad31dd2', - start_time: 10737.89999999106, - team_id: 12345, - time_origin: 1671723295836, - timestamp: 1671723306573, - transfer_size: 2067, - uuid: 'some-id', - window_id: '1853a793ad424a5-017f7473b057f1-17525635-384000-1853a793ad524dc', - worker_start: 0, - }) - }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts new file mode 100644 index 00000000000..3bd07dcce9e --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts @@ -0,0 +1,251 @@ +import { Upload } from '@aws-sdk/lib-storage' +import { randomUUID } from 'crypto' +import fs from 'fs/promises' +import { DateTime, Settings } from 'luxon' +import path from 'path' +import { PassThrough } from 'stream' +import * as zlib from 'zlib' + +import { defaultConfig } from '../../../../../src/config/config' +import { SessionManagerV3 } from '../../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3' +import { now } from '../../../../../src/main/ingestion-queues/session-recording/utils' +import { createIncomingRecordingMessage } from '../fixtures' + +jest.mock('@aws-sdk/lib-storage', () => { + const mockUpload = jest.fn().mockImplementation(() => { + return { + abort: jest.fn().mockResolvedValue(undefined), + done: jest.fn().mockResolvedValue(undefined), + } + }) + + return { + __esModule: true, + Upload: mockUpload, + } +}) + +const tmpDir = path.join(__dirname, '../../../../../.tmp/test_session_recordings') + +describe('session-manager', () => { + jest.setTimeout(1000) + let sessionManager: SessionManagerV3 + const mockS3Client: any = { + send: jest.fn(), + } + + const createSessionManager = async ( + sessionId = randomUUID(), + teamId = 1, + partition = 1 + ): Promise => { + return await SessionManagerV3.create(defaultConfig, mockS3Client, { + sessionId, + teamId, + partition, + dir: path.join(tmpDir, `${partition}`, `${teamId}__${sessionId}`), + }) + } + + const flushThreshold = defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000 + + beforeEach(async () => { + // it's always May 25 + Settings.now = () => new Date(2018, 4, 25).valueOf() + + if (await fs.stat(tmpDir).catch(() => null)) { + await fs.rmdir(tmpDir, { recursive: true }) + } + + sessionManager = await createSessionManager() + }) + + afterEach(async () => { + await sessionManager?.stop() + // it's no longer always May 25 + Settings.now = () => new Date().valueOf() + }) + + it('adds a message', async () => { + const timestamp = now() + const event = createIncomingRecordingMessage({ + events: [ + { timestamp: timestamp, type: 4, data: { href: 'http://localhost:3001/' } }, + { timestamp: timestamp + 1000, type: 4, data: { href: 'http://localhost:3001/' } }, + ], + }) + + await sessionManager.add(event) + + expect(sessionManager.buffer?.context).toEqual({ + sizeEstimate: 193, + count: 1, + eventsRange: { firstTimestamp: timestamp, lastTimestamp: timestamp + 1000 }, + createdAt: timestamp, + }) + + const stats = await fs.stat(`${sessionManager.context.dir}/buffer.jsonl`) + expect(stats.size).toBeGreaterThan(0) + }) + + it('does not flush if it has received a message recently', async () => { + const now = DateTime.now() + + const event = createIncomingRecordingMessage({ + metadata: { + timestamp: now, + } as any, + }) + + await sessionManager.add(event) + await sessionManager.flush() + + expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) + }) + + it('does flush if the stored file is older than the threshold', async () => { + const firstTimestamp = 1700000000000 + const lastTimestamp = 1700000000000 + 4000 + + const eventOne = createIncomingRecordingMessage({ + events: [{ timestamp: firstTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }], + }) + const eventTwo = createIncomingRecordingMessage({ + events: [{ timestamp: lastTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }], + }) + + await sessionManager.add(eventOne) + await sessionManager.add(eventTwo) + + sessionManager.buffer!.context.createdAt = now() - flushThreshold - 1 + + await sessionManager.flush() + + expect(await fs.readdir(sessionManager.context.dir)).toEqual([]) + + const sessionId = sessionManager.context.sessionId + + // as a proxy for flush having been called or not + const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls + expect(mockUploadCalls.length).toBe(1) + expect(mockUploadCalls[0].length).toBe(1) + expect(mockUploadCalls[0][0]).toEqual( + expect.objectContaining({ + params: expect.objectContaining({ + Key: `session_recordings/team_id/1/session_id/${sessionId}/data/${firstTimestamp}-${lastTimestamp}.jsonl`, + }), + }) + ) + }) + + it('has a fixed jitter based on the serverConfig', async () => { + const minJitter = 1 - defaultConfig.SESSION_RECORDING_BUFFER_AGE_JITTER + for (const _ of Array(100).keys()) { + const sm = await createSessionManager() + expect(sm.flushJitterMultiplier).toBeGreaterThanOrEqual(minJitter) + expect(sm.flushJitterMultiplier).toBeLessThanOrEqual(1) + } + }) + + it('not remove files when stopped', async () => { + expect(await fs.readdir(sessionManager.context.dir)).toEqual([]) + await sessionManager.add(createIncomingRecordingMessage()) + expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) + await sessionManager.stop() + expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) + }) + + it('removes the directly when stopped after fully flushed', async () => { + const sm = await createSessionManager('session_id_2', 2, 2) + expect(await fs.readdir(sm.context.dir)).toEqual([]) + await sm.add(createIncomingRecordingMessage()) + expect(await fs.readdir(sm.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) + await sm.flush(true) + expect(await fs.readdir(sm.context.dir)).toEqual([]) + await sm.stop() + // ;(sessionManager as any) = undefined // Stop the afterEach from failing + + await expect(fs.stat(sm.context.dir)).rejects.toThrowError('ENOENT: no such file or directory') + }) + + it('reads successfully with the stream not closed', async () => { + const event = createIncomingRecordingMessage({ + events: [ + { timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }, + { timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } }, + ], + }) + await sessionManager.add(event) + + const content = await fs.readFile(`${sessionManager.context.dir}/buffer.jsonl`, 'utf-8') + expect(content).toEqual( + '{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n' + ) + }) + + it('adds to the existing buffer when restarted', async () => { + const sm1 = await createSessionManager('session_id_2', 2, 2) + + await sm1.add( + createIncomingRecordingMessage({ + events: [ + { timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }, + { timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } }, + ], + }) + ) + + const sm2 = await createSessionManager('session_id_2', 2, 2) + await sm2.add( + createIncomingRecordingMessage({ + events: [ + { timestamp: 170000000 + 2000, type: 4, data: { href: 'http://localhost:3001/' } }, + { timestamp: 170000000 + 3000, type: 4, data: { href: 'http://localhost:3001/' } }, + ], + }) + ) + + const buffer = await fs.readFile(`${sm1.context.dir}/buffer.jsonl`, 'utf-8') + expect(buffer).toEqual( + '{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n{"window_id":"window_id_1","data":[{"timestamp":170002000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170003000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n' + ) + + await sm1.stop() + await sm2.stop() + }) + + it('uploads a gzip compressed file to S3', async () => { + await sessionManager.add( + createIncomingRecordingMessage({ + events: [{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }], + }) + ) + await sessionManager.flush(true) + + const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls + expect(mockUploadCalls[0][0]).toMatchObject({ + params: { + Bucket: 'posthog', + Key: `session_recordings/team_id/1/session_id/${sessionManager.context.sessionId}/data/170000000-170000000.jsonl`, + ContentEncoding: 'gzip', + ContentType: 'application/json', + }, + }) + const uploadBody = mockUploadCalls[0][0].params.Body + + expect(uploadBody).toBeInstanceOf(PassThrough) + + // Extract the content from the stream and gzip decompress it + const chunks: Uint8Array[] = [] + for await (const chunk of uploadBody) { + chunks.push(chunk) + } + + const buffer = Buffer.concat(chunks) + const uncompressed = zlib.gunzipSync(buffer).toString('utf-8') + + expect(uncompressed).toEqual( + '{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n' + ) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts new file mode 100644 index 00000000000..c6bd73de63d --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -0,0 +1,295 @@ +import { randomUUID } from 'crypto' +import fs from 'fs/promises' +import { mkdirSync, rmSync } from 'node:fs' +import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka' +import path from 'path' + +import { waitForExpect } from '../../../../functional_tests/expectations' +import { defaultConfig } from '../../../../src/config/config' +import { + SessionManagerBufferContext, + SessionManagerContext, +} from '../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3' +import { SessionRecordingIngesterV3 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v3' +import { Hub, PluginsServerConfig, Team } from '../../../../src/types' +import { createHub } from '../../../../src/utils/db/hub' +import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql' +import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures' + +const SESSION_RECORDING_REDIS_PREFIX = '@posthog-tests/replay/' + +const tmpDir = path.join(__dirname, '../../../../.tmp/test_session_recordings') + +const config: PluginsServerConfig = { + ...defaultConfig, + SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: true, + SESSION_RECORDING_REDIS_PREFIX, + SESSION_RECORDING_LOCAL_DIRECTORY: tmpDir, +} + +async function deleteKeysWithPrefix(hub: Hub) { + const redisClient = await hub.redisPool.acquire() + const keys = await redisClient.keys(`${SESSION_RECORDING_REDIS_PREFIX}*`) + const pipeline = redisClient.pipeline() + keys.forEach(function (key) { + pipeline.del(key) + }) + await pipeline.exec() + await hub.redisPool.release(redisClient) +} + +const mockConsumer = { + on: jest.fn(), + commitSync: jest.fn(), + commit: jest.fn(), + queryWatermarkOffsets: jest.fn(), + committed: jest.fn(), + assignments: jest.fn(), + isConnected: jest.fn(() => true), + getMetadata: jest.fn(), +} + +jest.mock('../../../../src/kafka/batch-consumer', () => { + return { + startBatchConsumer: jest.fn(() => + Promise.resolve({ + join: () => ({ + finally: jest.fn(), + }), + stop: jest.fn(), + consumer: mockConsumer, + }) + ), + } +}) + +jest.setTimeout(1000) + +describe('ingester', () => { + let ingester: SessionRecordingIngesterV3 + + let hub: Hub + let closeHub: () => Promise + let team: Team + let teamToken = '' + let mockOffsets: Record = {} + let mockCommittedOffsets: Record = {} + + beforeAll(async () => { + mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) + await resetTestDatabase() + }) + + beforeEach(async () => { + if (await fs.stat(tmpDir).catch(() => null)) { + await fs.rmdir(tmpDir, { recursive: true }) + } + + // The below mocks simulate committing to kafka and querying the offsets + mockCommittedOffsets = {} + mockOffsets = {} + mockConsumer.commit.mockImplementation( + (tpo: TopicPartitionOffset) => (mockCommittedOffsets[tpo.partition] = tpo.offset) + ) + mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { + cb(null, { highOffset: mockOffsets[partition] ?? 1, lowOffset: 0 }) + }) + + mockConsumer.getMetadata.mockImplementation((options, cb) => { + cb(null, { + topics: [{ name: options.topic, partitions: [{ id: 0 }, { id: 1 }, { id: 2 }] }], + }) + }) + + mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], _timeout, cb) => { + const tpos: TopicPartitionOffset[] = topicPartitions.map((tp) => ({ + topic: tp.topic, + partition: tp.partition, + offset: mockCommittedOffsets[tp.partition] ?? 1, + })) + + cb(null, tpos) + }) + ;[hub, closeHub] = await createHub() + team = await getFirstTeam(hub) + teamToken = team.api_token + await deleteKeysWithPrefix(hub) + + ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + await ingester.start() + + mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) + }) + + afterEach(async () => { + jest.setTimeout(10000) + await deleteKeysWithPrefix(hub) + await ingester.stop() + await closeHub() + }) + + afterAll(() => { + rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) + jest.useRealTimers() + }) + + const createMessage = (session_id: string, partition = 1) => { + mockOffsets[partition] = mockOffsets[partition] ?? 0 + mockOffsets[partition]++ + + return createKafkaMessage( + teamToken, + { + partition, + offset: mockOffsets[partition], + }, + { + $session_id: session_id, + } + ) + } + + it('can parse debug partition config', () => { + const config = { + SESSION_RECORDING_DEBUG_PARTITION: '103', + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig + + const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toEqual(103) + }) + + it('can parse absence of debug partition config', () => { + const config = { + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig + + const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toBeUndefined() + }) + + it('creates a new session manager if needed', async () => { + const event = createIncomingRecordingMessage() + await ingester.consume(event) + await waitForExpect(() => { + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['1__session_id_1']).toBeDefined() + }) + }) + + it('handles multiple incoming sessions', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage({ + session_id: 'session_id_2', + }) + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(Object.keys(ingester.sessions).length).toBe(2) + expect(ingester.sessions['1__session_id_1']).toBeDefined() + expect(ingester.sessions['1__session_id_2']).toBeDefined() + }) + + it('destroys a session manager if finished', async () => { + const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` + const event = createIncomingRecordingMessage({ + session_id: sessionId, + }) + await ingester.consume(event) + expect(ingester.sessions[`1__${sessionId}`]).toBeDefined() + ingester.sessions[`1__${sessionId}`].buffer!.context.createdAt = 0 + + await ingester.flushAllReadySessions() + + await waitForExpect(() => { + expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined() + }, 10000) + }) + + describe('simulated rebalancing', () => { + let otherIngester: SessionRecordingIngesterV3 + jest.setTimeout(5000) // Increased to cover lock delay + + beforeEach(async () => { + otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + await otherIngester.start() + }) + + afterEach(async () => { + await otherIngester.stop() + }) + + const getSessions = ( + ingester: SessionRecordingIngesterV3 + ): (SessionManagerContext & SessionManagerBufferContext)[] => + Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer!.context })) + + /** + * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs + * Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up + */ + it('rebalances new consumers', async () => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] + + mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2]) + + expect(getSessions(ingester)).toMatchObject([ + { sessionId: 'session_id_1', partition: 1, count: 1 }, + { sessionId: 'session_id_2', partition: 1, count: 1 }, + { sessionId: 'session_id_3', partition: 2, count: 1 }, + { sessionId: 'session_id_4', partition: 2, count: 1 }, + ]) + + // Call handleEachBatch with both consumers - we simulate the assignments which + // is what is responsible for the actual syncing of the sessions + mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + await otherIngester.handleEachBatch([createMessage('session_id_4', 2), createMessage('session_id_5', 2)]) + mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + await ingester.handleEachBatch([createMessage('session_id_1', 1)]) + + // Should still have the partition 1 sessions that didnt move with added events + expect(getSessions(ingester)).toMatchObject([ + { sessionId: 'session_id_1', partition: 1, count: 2 }, + { sessionId: 'session_id_2', partition: 1, count: 1 }, + ]) + expect(getSessions(otherIngester)).toMatchObject([ + { sessionId: 'session_id_3', partition: 2, count: 1 }, + { sessionId: 'session_id_4', partition: 2, count: 2 }, + { sessionId: 'session_id_5', partition: 2, count: 1 }, + ]) + }) + }) + + describe('stop()', () => { + const setup = async (): Promise => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1) + } + + // TODO: Unskip when we add back in the replay and console ingestion + it('shuts down without error', async () => { + await setup() + + await expect(ingester.stop()).resolves.toMatchObject([ + // destroy sessions, + { status: 'fulfilled' }, + // // stop replay ingester + // { status: 'fulfilled' }, + // // stop console ingester + // { status: 'fulfilled' }, + ]) + }) + }) + + describe('when a team is disabled', () => { + it('ignores invalid teams', async () => { + // non-zero offset because the code can't commit offset 0 + await ingester.handleEachBatch([ + createKafkaMessage('invalid_token', { offset: 12 }), + createKafkaMessage('invalid_token', { offset: 13 }), + ]) + + expect(ingester.sessions).toEqual({}) + }) + }) +}) diff --git a/posthog/session_recordings/session_recording_api.py b/posthog/session_recordings/session_recording_api.py index 10af0dd71a0..73147fe4d4c 100644 --- a/posthog/session_recordings/session_recording_api.py +++ b/posthog/session_recordings/session_recording_api.py @@ -342,7 +342,10 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet): for full_key in blob_keys: # Keys are like 1619712000-1619712060 blob_key = full_key.replace(blob_prefix.rstrip("/") + "/", "") - time_range = [datetime.fromtimestamp(int(x) / 1000, tz=timezone.utc) for x in blob_key.split("-")] + blob_key_base = blob_key.split(".")[0] # Remove the extension if it exists + time_range = [ + datetime.fromtimestamp(int(x) / 1000, tz=timezone.utc) for x in blob_key_base.split("-") + ] sources.append( { @@ -378,7 +381,19 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet): response_data["sources"] = sources elif source == "realtime": - snapshots = get_realtime_snapshots(team_id=self.team.pk, session_id=str(recording.session_id)) or [] + if request.GET.get("version", None) == "3" and settings.RECORDINGS_INGESTER_URL: + with requests.get( + url=f"{settings.RECORDINGS_INGESTER_URL}/api/projects/{self.team.pk}/session_recordings/{str(recording.session_id)}/snapshots", + stream=True, + ) as r: + if r.status_code == 404: + return Response({"snapshots": []}) + + response = HttpResponse(content=r.raw, content_type="application/json") + response["Content-Disposition"] = "inline" + return response + else: + snapshots = get_realtime_snapshots(team_id=self.team.pk, session_id=str(recording.session_id)) or [] event_properties["source"] = "realtime" event_properties["snapshots_length"] = len(snapshots) diff --git a/posthog/settings/session_replay.py b/posthog/settings/session_replay.py index 943e4466e82..7c349c19403 100644 --- a/posthog/settings/session_replay.py +++ b/posthog/settings/session_replay.py @@ -23,3 +23,5 @@ REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS = get_from_env( ) REPLAY_EMBEDDINGS_ALLOWED_TEAMS: List[str] = get_list(get_from_env("REPLAY_EMBEDDINGS_ALLOWED_TEAM", "", type_cast=str)) + +RECORDINGS_INGESTER_URL = get_from_env("RECORDINGS_INGESTER_URL", "")