From 13234492937013e1c285b89d37cf39430853b330 Mon Sep 17 00:00:00 2001 From: Ben White Date: Mon, 6 May 2024 09:15:25 +0200 Subject: [PATCH] chore: Remove experimental replay ingester (#21969) --- plugin-server/src/capabilities.ts | 5 - .../services/session-manager-v3.ts | 526 ------------------ .../session-recordings-consumer-v3.ts | 525 ----------------- .../session-recording/utils.ts | 47 -- plugin-server/src/main/pluginsServer.ts | 22 - plugin-server/src/types.ts | 2 - .../__snapshots__/utils.test.ts.snap | 98 ---- .../services/session-manager-v3.test.ts | 319 ----------- .../session-recordings-consumer-v3.test.ts | 369 ------------ .../session-recording/utils.test.ts | 63 --- 10 files changed, 1976 deletions(-) delete mode 100644 plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts delete mode 100644 plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts delete mode 100644 plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts delete mode 100644 plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index caa5b8f576a..cda8da5b20a 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -61,11 +61,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin sessionRecordingBlobOverflowIngestion: true, ...sharedCapabilities, } - case PluginServerMode.recordings_ingestion_v3: - return { - sessionRecordingV3Ingestion: true, - ...sharedCapabilities, - } case PluginServerMode.async_onevent: return { processAsyncOnEventHandlers: true, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts deleted file mode 100644 index 8eb18b248d8..00000000000 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts +++ /dev/null @@ -1,526 +0,0 @@ -import { Upload } from '@aws-sdk/lib-storage' -import { captureException, captureMessage } from '@sentry/node' -import { createReadStream, createWriteStream, WriteStream } from 'fs' -import { mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises' -import path from 'path' -import { Counter, Histogram } from 'prom-client' -import { PassThrough } from 'stream' -import { pipeline } from 'stream/promises' -import * as zlib from 'zlib' - -import { PluginsServerConfig } from '../../../../types' -import { status } from '../../../../utils/status' -import { asyncTimeoutGuard } from '../../../../utils/timing' -import { ObjectStorage } from '../../../services/object_storage' -import { IncomingRecordingMessage } from '../types' -import { convertForPersistence, maxDefined, minDefined, now } from '../utils' - -const BUCKETS_LINES_WRITTEN = [0, 10, 50, 100, 500, 1000, 2000, 5000, 10000, Infinity] -export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity] -const S3_UPLOAD_WARN_TIME_SECONDS = 2 * 60 * 1000 - -// NOTE: To remove once released -const metricPrefix = 'v3_' - -export const FILE_EXTENSION = '.jsonl' -export const BUFFER_FILE_NAME = `buffer${FILE_EXTENSION}` -export const FLUSH_FILE_EXTENSION = `.flush${FILE_EXTENSION}` -export const METADATA_FILE_NAME = `metadata.json` - -const writeStreamBlocked = new Counter({ - name: metricPrefix + 'recording_blob_ingestion_write_stream_blocked', - help: 'Number of times we get blocked by the stream backpressure', -}) - -const counterS3FilesWritten = new Counter({ - name: metricPrefix + 'recording_s3_files_written', - help: 'A single file flushed to S3', -}) - -const counterS3WriteErrored = new Counter({ - name: metricPrefix + 'recording_s3_write_errored', - help: 'Indicates that we failed to flush to S3 without recovering', -}) - -const bufferLoadFailedCounter = new Counter({ - name: metricPrefix + 'recording_load_from_file_failed', - help: 'Indicates that we failed to load the file from disk', -}) - -const histogramS3LinesWritten = new Histogram({ - name: metricPrefix + 'recording_s3_lines_written_histogram', - help: 'The number of lines in a file we send to s3', - buckets: BUCKETS_LINES_WRITTEN, -}) - -const histogramS3KbWritten = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_s3_kb_written', - help: 'The uncompressed size of file we send to S3', - buckets: BUCKETS_KB_WRITTEN, -}) - -const histogramSessionAgeSeconds = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_session_age_seconds', - help: 'The age of current sessions in seconds', - buckets: [0, 60, 60 * 2, 60 * 5, 60 * 8, 60 * 10, 60 * 12, 60 * 15, 60 * 20, Infinity], -}) - -const histogramSessionSizeKb = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_session_size_kb', - help: 'The size of current sessions in kb', - buckets: BUCKETS_KB_WRITTEN, -}) - -const histogramFlushTimeSeconds = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_session_flush_time_seconds', - help: 'The time taken to flush a session in seconds', - buckets: [0, 2, 5, 10, 20, 30, 60, 120, 180, 300, Infinity], -}) - -const histogramSessionSize = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_session_lines', - help: 'The size of sessions in numbers of lines', - buckets: BUCKETS_LINES_WRITTEN, -}) - -const histogramBackpressureBlockedSeconds = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_backpressure_blocked_seconds', - help: 'The time taken to flush a session in seconds', - buckets: [0, 2, 5, 10, 20, 30, 60, 120, 180, 300, Infinity], -}) - -export type SessionManagerBufferContext = { - sizeEstimate: number - count: number - eventsRange: { - firstTimestamp: number - lastTimestamp: number - } | null - createdAt: number -} - -// Context that is updated and persisted to disk so must be serializable -export type SessionManagerContext = { - dir: string - sessionId: string - teamId: number - partition: number -} - -export class SessionManagerV3 { - buffer?: SessionManagerBufferContext - bufferWriteStream?: WriteStream - - flushPromise?: Promise - destroying = false - inProgressUpload: Upload | null = null - flushJitterMultiplier: number - - readonly setupPromise: Promise - - constructor( - public readonly serverConfig: PluginsServerConfig, - public readonly s3Client: ObjectStorage['s3'], - public readonly context: SessionManagerContext - ) { - // We add a jitter multiplier to the buffer age so that we don't have all sessions flush at the same time - this.flushJitterMultiplier = 1 - Math.random() * serverConfig.SESSION_RECORDING_BUFFER_AGE_JITTER - this.setupPromise = this.setup() - } - - private file(name: string): string { - return path.join(this.context.dir, name) - } - - private async setup(): Promise { - await mkdir(this.context.dir, { recursive: true }) - - const bufferFileExists = await stat(this.file(BUFFER_FILE_NAME)) - .then(() => true) - .catch(() => false) - - let metadataFileContent: string | undefined - let context: SessionManagerBufferContext | undefined - - if (!bufferFileExists) { - status.info('๐Ÿ“ฆ', '[session-manager] started new manager', { - ...this.context, - ...(this.buffer ?? {}), - }) - return - } - - try { - metadataFileContent = await readFile(this.file(METADATA_FILE_NAME), 'utf-8') - context = JSON.parse(metadataFileContent) - } catch (error) { - // Indicates no buffer metadata file or it's corrupted - status.error('๐Ÿงจ', '[session-manager] failed to read buffer metadata.json', { - ...this.context, - error, - }) - - this.captureMessage('Failed to read buffer metadata.json', { error }) - - // NOTE: This is not ideal... we fallback to loading the buffer.jsonl and deriving metadata from that as best as possible - // If that still fails then we have to bail out and drop the buffer.jsonl (data loss...) - - try { - const stats = await stat(this.file(BUFFER_FILE_NAME)) - - context = { - sizeEstimate: stats.size, - count: 1, // We can't afford to load the whole file into memory so we assume 1 line - eventsRange: { - firstTimestamp: Math.round(stats.birthtimeMs), - // This is really less than ideal but we don't have much choice - lastTimestamp: Date.now(), - }, - createdAt: Math.round(stats.birthtimeMs), - } - } catch (error) { - status.error('๐Ÿงจ', '[session-manager] failed to determine metadata from buffer file', { - ...this.context, - error, - }) - } - } - - if (!context) { - // Indicates we couldn't successfully read the metadata file - await unlink(this.file(METADATA_FILE_NAME)).catch(() => null) - await unlink(this.file(BUFFER_FILE_NAME)).catch(() => null) - - bufferLoadFailedCounter.inc() - - this.captureException(new Error('Failed to read buffer metadata. Resorted to hard deletion'), { - metadataFileContent, - }) - - return - } - - this.buffer = context - - status.info('๐Ÿ“ฆ', '[session-manager] started new manager from existing file', { - ...this.context, - ...(this.buffer ?? {}), - }) - } - - private async syncMetadata(): Promise { - if (this.buffer) { - await writeFile(this.file(METADATA_FILE_NAME), JSON.stringify(this.buffer), 'utf-8') - } else { - await unlink(this.file(METADATA_FILE_NAME)) - } - } - - private async getFlushFiles(): Promise { - return (await readdir(this.context.dir)).filter((file) => file.endsWith(FLUSH_FILE_EXTENSION)) - } - - private captureException(error: Error, extra: Record = {}): void { - captureException(error, { - extra: { ...this.context, ...extra }, - tags: { teamId: this.context.teamId, sessionId: this.context.sessionId }, - }) - } - - private captureMessage(message: string, extra: Record = {}): void { - const context = this.context - captureMessage(message, { - extra: { ...context, ...extra }, - tags: { teamId: context.teamId, sessionId: context.sessionId }, - }) - } - - public async add(message: IncomingRecordingMessage): Promise { - if (this.destroying) { - return - } - - await this.setupPromise - - try { - const buffer = this.getOrCreateBuffer() - - const content = - convertForPersistence(message.eventsByWindowId) - .map((x) => JSON.stringify(x)) - .join('\n') + '\n' - - buffer.count += 1 - buffer.sizeEstimate += content.length - buffer.eventsRange = { - firstTimestamp: - minDefined(message.eventsRange.start, buffer.eventsRange?.firstTimestamp) ?? - message.eventsRange.start, - lastTimestamp: - maxDefined(message.eventsRange.end, buffer.eventsRange?.lastTimestamp) ?? message.eventsRange.end, - } - - if (!this.bufferWriteStream!.write(content, 'utf-8')) { - writeStreamBlocked.inc() - - const stopTimer = histogramBackpressureBlockedSeconds.startTimer() - await new Promise((r) => this.bufferWriteStream!.once('drain', r)) - stopTimer() - } - await this.syncMetadata() - } catch (error) { - this.captureException(error, { message }) - throw error - } - } - - public async isEmpty(): Promise { - return !this.buffer?.count && !(await this.getFlushFiles()).length - } - - public async flush(force = false): Promise { - if (this.destroying) { - return - } - - await this.setupPromise - - if (!force) { - await this.maybeFlushCurrentBuffer() - } else { - // This is mostly used by tests - await this.markCurrentBufferForFlush() - } - - await this.flushFiles() - } - - private async maybeFlushCurrentBuffer(): Promise { - if (!this.buffer) { - return - } - - if (this.buffer.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) { - return this.markCurrentBufferForFlush() - } - - const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000 - const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier - - const logContext: Record = { - ...this.context, - flushThresholdMs, - flushThresholdJitteredMs, - } - - if (!this.buffer.count) { - status.warn('๐Ÿšฝ', `[session-manager] buffer has no items yet`, { logContext }) - return - } - - const bufferAgeInMemoryMs = now() - this.buffer.createdAt - - // check the in-memory age against a larger value than the flush threshold, - // otherwise we'll flap between reasons for flushing when close to real-time processing - const isSessionAgeOverThreshold = bufferAgeInMemoryMs >= flushThresholdJitteredMs - - logContext['bufferAgeInMemoryMs'] = bufferAgeInMemoryMs - logContext['isSessionAgeOverThreshold'] = isSessionAgeOverThreshold - - histogramSessionAgeSeconds.observe(bufferAgeInMemoryMs / 1000) - histogramSessionSize.observe(this.buffer.count) - histogramSessionSizeKb.observe(this.buffer.sizeEstimate / 1024) - - if (isSessionAgeOverThreshold) { - return this.markCurrentBufferForFlush() - } - } - - private async markCurrentBufferForFlush(): Promise { - const buffer = this.buffer - if (!buffer) { - // TODO: maybe error properly here? - return - } - - if (!buffer.eventsRange || !buffer.count) { - // Indicates some issue with the buffer so we can close out - this.buffer = undefined - return - } - - // ADD FLUSH METRICS HERE - - const { firstTimestamp, lastTimestamp } = buffer.eventsRange - const fileName = `${firstTimestamp}-${lastTimestamp}${FLUSH_FILE_EXTENSION}` - - histogramS3LinesWritten.observe(buffer.count) - histogramS3KbWritten.observe(buffer.sizeEstimate / 1024) - - await new Promise((resolve) => (this.bufferWriteStream ? this.bufferWriteStream.end(resolve) : resolve())) - await rename(this.file(BUFFER_FILE_NAME), this.file(fileName)) - this.buffer = undefined - - await this.syncMetadata() - } - - private async flushFiles(): Promise { - // We read all files marked for flushing and write them to S3 - const filesToFlush = await this.getFlushFiles() - await Promise.all(filesToFlush.map((file) => this.flushFile(file))) - } - - private async flushFile(filename: string): Promise { - status.info('๐Ÿšฝ', '[session-manager] flushing file to S3', { - filename, - ...this.context, - }) - if (this.destroying) { - status.warn('๐Ÿšฝ', '[session-manager] flush called but we are in a destroying state', { - ...this.context, - }) - return - } - - const file = this.file(filename) - - const deleteFile = async () => { - await stat(file) - await unlink(file) - } - - const endFlushTimer = histogramFlushTimeSeconds.startTimer() - - try { - const targetFileName = filename.replace(FLUSH_FILE_EXTENSION, FILE_EXTENSION) - const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.context.teamId}/session_id/${this.context.sessionId}` - const dataKey = `${baseKey}/data/${targetFileName}` - - const readStream = createReadStream(file) - const uploadStream = new PassThrough() - - // The compressed file - pipeline(readStream, zlib.createGzip(), uploadStream).catch((error) => { - // TODO: If this actually happens we probably want to destroy the buffer as we will be stuck... - status.error('๐Ÿงจ', '[session-manager] writestream errored', { - ...this.context, - error, - }) - this.captureException(error) - }) - - readStream.on('error', (err) => { - // TODO: What should we do here? - status.error('๐Ÿงจ', '[session-manager] readstream errored', { - ...this.context, - error: err, - }) - - this.captureException(err) - }) - - const inProgressUpload = (this.inProgressUpload = new Upload({ - client: this.s3Client, - params: { - Bucket: this.serverConfig.OBJECT_STORAGE_BUCKET, - Key: dataKey, - ContentEncoding: 'gzip', - ContentType: 'application/jsonl', - Body: uploadStream, - }, - })) - - await asyncTimeoutGuard( - { - message: 'session-manager.flush uploading file to S3 delayed.', - timeout: S3_UPLOAD_WARN_TIME_SECONDS, - }, - async () => { - await inProgressUpload.done() - } - ) - - counterS3FilesWritten.inc(1) - } catch (error: any) { - // TRICKY: error can for some reason sometimes be undefined... - error = error || new Error('Unknown Error') - - if (error.name === 'AbortError' && this.destroying) { - // abort of inProgressUpload while destroying is expected - return - } - - await this.inProgressUpload?.abort() - - // TODO: If we fail to write to S3 we should be do something about it - status.error('๐Ÿงจ', '[session-manager] failed writing session recording blob to S3', { - errorMessage: `${error.name || 'Unknown Error Type'}: ${error.message}`, - error, - ...this.context, - }) - this.captureException(error) - counterS3WriteErrored.inc() - - throw error - } finally { - endFlushTimer() - await deleteFile() - } - } - - private getOrCreateBuffer(): SessionManagerBufferContext { - if (!this.buffer) { - try { - const buffer: SessionManagerBufferContext = { - sizeEstimate: 0, - count: 0, - eventsRange: null, - createdAt: now(), - } - - this.buffer = buffer - } catch (error) { - this.captureException(error) - throw error - } - } - - if (this.buffer && !this.bufferWriteStream) { - this.bufferWriteStream = this.createFileStreamFor(path.join(this.context.dir, BUFFER_FILE_NAME)) - } - - return this.buffer - } - - protected createFileStreamFor(file: string): WriteStream { - return createWriteStream(file, { - // Opens in append mode in case it already exists - flags: 'a', - encoding: 'utf-8', - }) - } - - public async stop(): Promise { - this.destroying = true - if (this.inProgressUpload !== null) { - await this.inProgressUpload.abort().catch((error) => { - status.error('๐Ÿงจ', '[session-manager][realtime] failed to abort in progress upload', { - ...this.context, - error, - }) - this.captureException(error) - }) - this.inProgressUpload = null - } - - await new Promise((resolve) => (this.bufferWriteStream ? this.bufferWriteStream.end(resolve) : resolve())) - - if (await this.isEmpty()) { - status.info('๐Ÿงจ', '[session-manager] removing empty session directory', { - ...this.context, - }) - - await rmdir(this.context.dir, { recursive: true }) - } - } -} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts deleted file mode 100644 index 09d0be45375..00000000000 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ /dev/null @@ -1,525 +0,0 @@ -import { captureException } from '@sentry/node' -import { createReadStream } from 'fs' -import { readdir, stat } from 'fs/promises' -import { features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' -import path from 'path' -import { Counter, Gauge, Histogram } from 'prom-client' - -import { sessionRecordingConsumerConfig } from '../../../config/config' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' -import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' -import { createKafkaProducer } from '../../../kafka/producer' -import { PluginsServerConfig, TeamId } from '../../../types' -import { BackgroundRefresher } from '../../../utils/background-refresher' -import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' -import { PostgresRouter } from '../../../utils/db/postgres' -import { status } from '../../../utils/status' -import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' -import { expressApp } from '../../services/http-server' -import { ObjectStorage } from '../../services/object_storage' -import { runInstrumentedFunction } from '../../utils' -import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' -import { ConsoleLogsIngester } from './services/console-logs-ingester' -import { ReplayEventsIngester } from './services/replay-events-ingester' -import { BUCKETS_KB_WRITTEN, BUFFER_FILE_NAME, SessionManagerV3 } from './services/session-manager-v3' -import { IncomingRecordingMessage } from './types' -import { allSettledWithConcurrency, parseKafkaMessage, reduceRecordingMessages } from './utils' - -// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals -require('@sentry/tracing') - -// WARNING: Do not change this - it will essentially reset the consumer -const KAFKA_CONSUMER_GROUP_ID = 'session-replay-ingester' -const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 60000 - -// NOTE: To remove once released -const metricPrefix = 'v3_' - -const gaugeSessionsHandled = new Gauge({ - name: metricPrefix + 'recording_blob_ingestion_session_manager_count', - help: 'A gauge of the number of sessions being handled by this blob ingestion consumer', -}) - -const histogramKafkaBatchSize = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size', - help: 'The size of the batches we are receiving from Kafka', - buckets: [0, 50, 100, 250, 500, 750, 1000, 1500, 2000, 3000, Infinity], -}) - -const histogramKafkaBatchSizeKb = new Histogram({ - name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size_kb', - help: 'The size in kb of the batches we are receiving from Kafka', - buckets: BUCKETS_KB_WRITTEN, -}) - -const counterKafkaMessageReceived = new Counter({ - name: metricPrefix + 'recording_blob_ingestion_kafka_message_received', - help: 'The number of messages we have received from Kafka', - labelNames: ['partition'], -}) - -export interface TeamIDWithConfig { - teamId: TeamId | null - consoleLogIngestionEnabled: boolean -} - -/** - * @deprecated Delete reduceRecordingMessages and associated tests when deleting this. - * - * The SessionRecordingIngesterV3 - * relies on EFS network storage to avoid the need to delay kafka commits and instead uses the disk - * as the persistent volume for both blob data and the metadata around ingestion. - */ -export class SessionRecordingIngesterV3 { - sessions: Record = {} - replayEventsIngester?: ReplayEventsIngester - consoleLogsIngester?: ConsoleLogsIngester - batchConsumer?: BatchConsumer - teamsRefresher: BackgroundRefresher> - config: PluginsServerConfig - topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS - isStopping = false - - private promises: Set> = new Set() - // if ingestion is lagging on a single partition it is often hard to identify _why_, - // this allows us to output more information for that partition - private debugPartition: number | undefined = undefined - private sharedClusterProducerWrapper: KafkaProducerWrapper | undefined - - constructor( - private globalServerConfig: PluginsServerConfig, - private postgres: PostgresRouter, - private objectStorage: ObjectStorage - ) { - this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION - ? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION) - : undefined - - // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis - // We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. - this.config = sessionRecordingConsumerConfig(globalServerConfig) - - this.teamsRefresher = new BackgroundRefresher(async () => { - try { - status.info('๐Ÿ”', 'session-replay-ingestion - refreshing teams in the background') - return await fetchTeamTokensWithRecordings(this.postgres) - } catch (e) { - status.error('๐Ÿ”ฅ', 'session-replay-ingestion - failed to refresh teams in the background', e) - captureException(e) - throw e - } - }) - } - - private get rootDir() { - return path.join(this.config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-recordings') - } - - private dirForSession(partition: number, teamId: number, sessionId: string): string { - return path.join(this.rootDir, `${partition}`, `${teamId}__${sessionId}`) - } - - private get connectedBatchConsumer(): KafkaConsumer | undefined { - // Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors - const consumer = this.batchConsumer?.consumer - return consumer && consumer.isConnected() ? consumer : undefined - } - - private get assignedTopicPartitions(): TopicPartition[] { - return this.connectedBatchConsumer?.assignments() ?? [] - } - - private get assignedPartitions(): TopicPartition['partition'][] { - return this.assignedTopicPartitions.map((x) => x.partition) - } - - private scheduleWork(promise: Promise): Promise { - /** - * Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished. - * That way when shutting down we can wait for all promises to finish before exiting. - */ - this.promises.add(promise) - - // we void the promise returned by finally here to avoid the need to await it - void promise.finally(() => this.promises.delete(promise)) - - return promise - } - - public async consume(event: IncomingRecordingMessage): Promise { - const { team_id, session_id } = event - const key = `${team_id}__${session_id}` - - const { partition } = event.metadata - if (this.debugPartition === partition) { - status.info('๐Ÿ”', '[session-replay-ingestion] - [PARTITION DEBUG] - consuming event', { - ...event.metadata, - }) - } - - if (!this.sessions[key]) { - const { partition } = event.metadata - - // NOTE: It's important that this stays sync so that parallel calls will not create multiple session managers - this.sessions[key] = new SessionManagerV3(this.config, this.objectStorage.s3, { - teamId: team_id, - sessionId: session_id, - dir: this.dirForSession(partition, team_id, session_id), - partition, - }) - } - - await this.sessions[key]?.add(event) - } - - public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise { - status.info('๐Ÿ”', `session-replay-ingestion - handling batch`, { - size: messages.length, - partitionsInBatch: [...new Set(messages.map((x) => x.partition))], - assignedPartitions: this.assignedTopicPartitions.map((x) => x.partition), - sessionsHandled: Object.keys(this.sessions).length, - }) - - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch`, - logExecutionTime: true, - func: async () => { - histogramKafkaBatchSize.observe(messages.length) - histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) - - let recordingMessages: IncomingRecordingMessage[] = [] - - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`, - func: async () => { - for (const message of messages) { - counterKafkaMessageReceived.inc({ partition: message.partition }) - - const recordingMessage = await parseKafkaMessage( - message, - (token) => - this.teamsRefresher.get().then((teams) => ({ - teamId: teams[token]?.teamId || null, - consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, - })), - // v3 consumer does not emit ingestion warnings - undefined - ) - - if (recordingMessage) { - recordingMessages.push(recordingMessage) - } - } - - recordingMessages = reduceRecordingMessages(recordingMessages) - }, - }) - - heartbeat() - - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.ensureSessionsAreLoaded`, - func: async () => { - await this.syncSessionsWithDisk(heartbeat) - }, - }) - - heartbeat() - - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeBatch`, - func: async () => { - if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) { - await Promise.all(recordingMessages.map((x) => this.consume(x).then(heartbeat))) - } else { - for (const message of recordingMessages) { - await this.consume(message) - } - } - }, - }) - - heartbeat() - - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`, - func: async () => { - // TODO: This can time out if it ends up being overloaded - we should have a max limit here - await this.flushAllReadySessions(heartbeat) - }, - }) - - if (this.replayEventsIngester) { - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`, - func: async () => { - await this.replayEventsIngester!.consumeBatch(recordingMessages) - }, - }) - } - - if (this.consoleLogsIngester) { - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeConsoleLogEvents`, - func: async () => { - await this.consoleLogsIngester!.consumeBatch(recordingMessages) - }, - }) - } - }, - }) - } - - public async start(): Promise { - status.info('๐Ÿ”', 'session-replay-ingestion - starting session recordings blob consumer', { - librdKafkaVersion: librdkafkaVersion, - kafkaCapabilities: features, - }) - - this.setupHttpRoutes() - - // Load teams into memory - await this.teamsRefresher.refresh() - - // NOTE: This is the only place where we need to use the shared server config - const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.globalServerConfig) - const globalProducerConfig = createRdProducerConfigFromEnvVars(this.globalServerConfig) - - this.sharedClusterProducerWrapper = new KafkaProducerWrapper( - await createKafkaProducer(globalConnectionConfig, globalProducerConfig) - ) - this.sharedClusterProducerWrapper.producer.connect() - - // NOTE: This is the only place where we need to use the shared server config - if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) { - this.consoleLogsIngester = new ConsoleLogsIngester(this.sharedClusterProducerWrapper.producer) - } - - if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) { - this.replayEventsIngester = new ReplayEventsIngester(this.sharedClusterProducerWrapper.producer) - } - - // Create a node-rdkafka consumer that fetches batches of messages, runs - // eachBatchWithContext, then commits offsets for the batch. - // the batch consumer reads from the session replay kafka cluster - const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config) - this.batchConsumer = await startBatchConsumer({ - connectionConfig: replayClusterConnectionConfig, - groupId: KAFKA_CONSUMER_GROUP_ID, - topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, - autoCommit: true, // NOTE: This is the crucial difference between this and the other consumer - sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, - maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, - // the largest size of a message that can be fetched by the consumer. - // the largest size our MSK cluster allows is 20MB - // we only use 9 or 10MB but there's no reason to limit this ๐Ÿคท๏ธ - consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES, - consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, - // our messages are very big, so we don't want to buffer too many - queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE, - consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS, - consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, - fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE, - batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, - topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS, - eachBatch: async (messages, { heartbeat }) => { - return await this.scheduleWork(this.handleEachBatch(messages, heartbeat)) - }, - callEachBatchWhenEmpty: true, // Useful as we will still want to account for flushing sessions - debug: this.config.SESSION_RECORDING_KAFKA_DEBUG, - }) - - addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) - - this.batchConsumer.consumer.on('disconnected', async (err) => { - // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect - // we need to listen to disconnect and make sure we're stopped - status.info('๐Ÿ”', 'session-replay-ingestion batch consumer disconnected, cleaning up', { err }) - await this.stop() - }) - } - - public async stop(): Promise[]> { - this.isStopping = true - status.info('๐Ÿ”', 'session-replay-ingestion - stopping') - - // NOTE: We have to get the partitions before we stop the consumer as it throws if disconnected - // const assignedPartitions = this.assignedTopicPartitions - // Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive - await this.batchConsumer?.stop() - - void this.scheduleWork( - Promise.allSettled( - Object.entries(this.sessions).map(([key, sessionManager]) => this.destroySession(key, sessionManager)) - ) - ) - - const promiseResults = await Promise.allSettled(this.promises) - - if (this.sharedClusterProducerWrapper) { - await this.sharedClusterProducerWrapper.disconnect() - } - - status.info('๐Ÿ‘', 'session-replay-ingestion - stopped!') - - return promiseResults - } - - public isHealthy() { - // TODO: Maybe extend this to check if we are shutting down so we don't get killed early. - return this.batchConsumer?.isHealthy() - } - - async flushAllReadySessions(heartbeat: () => void): Promise { - const sessions = Object.entries(this.sessions) - - // NOTE: We want to avoid flushing too many sessions at once as it can cause a lot of disk backpressure stalling the consumer - await allSettledWithConcurrency( - this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES, - sessions, - async ([key, sessionManager], ctx) => { - heartbeat() - - if (this.isStopping) { - // We can end up with a large number of flushes. We want to stop early if we hit shutdown - return ctx.break() - } - - if (!this.assignedPartitions.includes(sessionManager.context.partition)) { - await this.destroySession(key, sessionManager) - return - } - - await sessionManager - .flush() - .catch((err) => { - status.error( - '๐Ÿšฝ', - 'session-replay-ingestion - failed trying to flush on idle session: ' + - sessionManager.context.sessionId, - { - err, - session_id: sessionManager.context.sessionId, - } - ) - captureException(err, { tags: { session_id: sessionManager.context.sessionId } }) - }) - .then(async () => { - // If the SessionManager is done (flushed and with no more queued events) then we remove it to free up memory - if (await sessionManager.isEmpty()) { - await this.destroySession(key, sessionManager) - } - }) - } - ) - - gaugeSessionsHandled.set(Object.keys(this.sessions).length) - } - - private async syncSessionsWithDisk(heartbeat: () => void): Promise { - // NOTE: With a lot of files on disk this can take a long time - // We need to ensure that as we loop we double check that we are still in charge of the partitions - - // TODO: Implement that (and also for flushing) it sync the assigned partitions with the current state of the consumer - - // As we may get assigned and reassigned partitions, we want to make sure that we have all sessions loaded into memory - - for (const partition of this.assignedPartitions) { - const keys = await readdir(path.join(this.rootDir, `${partition}`)).catch(() => { - // This happens if there are no files on disk for that partition yet - return [] - }) - - const relatedKeys = keys.filter((x) => /\d+__[a-zA-Z0-9\-]+/.test(x)) - - for (const key of relatedKeys) { - // TODO: Ensure sessionId can only be a uuid - const [teamId, sessionId] = key.split('__') - - if (this.isStopping) { - // We can end up with a large number of files we are processing. We want to stop early if we hit shutdown - return - } - - if (!this.assignedPartitions.includes(partition)) { - // Account for rebalances - continue - } - - if (!this.sessions[key]) { - this.sessions[key] = new SessionManagerV3(this.config, this.objectStorage.s3, { - teamId: parseInt(teamId), - sessionId, - dir: this.dirForSession(partition, parseInt(teamId), sessionId), - partition, - }) - - await this.sessions[key].setupPromise - } - heartbeat() - } - } - } - - private async destroySession(key: string, sessionManager: SessionManagerV3): Promise { - delete this.sessions[key] - await sessionManager.stop() - } - - private setupHttpRoutes() { - // Mimic the app server's endpoint - expressApp.get( - '/api/projects/:projectId/session_recordings/:sessionId/snapshots', - async (req: any, res: any) => { - await runInstrumentedFunction({ - statsKey: `recordingingester.http.getSnapshots`, - func: async () => { - try { - const startTime = Date.now() - res.on('finish', function () { - status.info('โšก๏ธ', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`) - }) - - // validate that projectId is a number and sessionId is UUID like - const projectId = parseInt(req.params.projectId) - if (isNaN(projectId)) { - res.sendStatus(404) - return - } - - const sessionId = req.params.sessionId - if (!/^[0-9a-f-]+$/.test(sessionId)) { - res.sendStatus(404) - return - } - - status.info('๐Ÿ”', 'session-replay-ingestion - fetching session', { projectId, sessionId }) - - // We don't know the partition upfront so we have to recursively check all partitions - const partitions = await readdir(this.rootDir).catch(() => []) - - for (const partition of partitions) { - const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId) - const exists = await stat(sessionDir).catch(() => null) - - if (!exists) { - continue - } - - const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME)) - fileStream.pipe(res) - return - } - - res.sendStatus(404) - } catch (e) { - status.error('๐Ÿ”ฅ', 'session-replay-ingestion - failed to fetch session', e) - res.sendStatus(500) - } - }, - }) - } - ) - } -} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 86f232fa574..e0426a12670 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -7,7 +7,6 @@ import { Counter } from 'prom-client' import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types' import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { status } from '../../../utils/status' -import { cloneObject } from '../../../utils/utils' import { captureIngestionWarning } from '../../../worker/ingestion/utils' import { eventDroppedCounter } from '../metrics' import { TeamIDWithConfig } from './session-recordings-consumer' @@ -359,52 +358,6 @@ export const parseKafkaBatch = async ( } } -/** - * @deprecated Delete when removing session-recordings-consumer-v3 - */ -export const reduceRecordingMessages = (messages: IncomingRecordingMessage[]): IncomingRecordingMessage[] => { - /** - * It can happen that a single batch contains all messages for the same session. - * A big perf win here is to group everything up front and then reduce the messages - * to a single message per session. - */ - const reducedMessages: Record = {} - - for (const message of messages) { - const key = `${message.team_id}-${message.session_id}` - if (!reducedMessages[key]) { - reducedMessages[key] = cloneObject(message) - } else { - const existingMessage = reducedMessages[key] - for (const [windowId, events] of Object.entries(message.eventsByWindowId)) { - if (existingMessage.eventsByWindowId[windowId]) { - existingMessage.eventsByWindowId[windowId].push(...events) - } else { - existingMessage.eventsByWindowId[windowId] = events - } - } - existingMessage.metadata.rawSize += message.metadata.rawSize - - // Update the events ranges - existingMessage.metadata.lowOffset = Math.min( - existingMessage.metadata.lowOffset, - message.metadata.lowOffset - ) - - existingMessage.metadata.highOffset = Math.max( - existingMessage.metadata.highOffset, - message.metadata.highOffset - ) - - // Update the events ranges - existingMessage.eventsRange.start = Math.min(existingMessage.eventsRange.start, message.eventsRange.start) - existingMessage.eventsRange.end = Math.max(existingMessage.eventsRange.end, message.eventsRange.end) - } - } - - return Object.values(reducedMessages) -} - export const convertForPersistence = ( messages: IncomingRecordingMessage['eventsByWindowId'] ): PersistedRecordingMessage[] => { diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index bbb153b2ee6..870226cb75e 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -39,7 +39,6 @@ import { } from './ingestion-queues/on-event-handler-consumer' import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' import { SessionRecordingIngester } from './ingestion-queues/session-recording/session-recordings-consumer' -import { SessionRecordingIngesterV3 } from './ingestion-queues/session-recording/session-recordings-consumer-v3' import { setupCommonRoutes } from './services/http-server' import { getObjectStorage } from './services/object_storage' @@ -482,27 +481,6 @@ export async function startPluginsServer( } } - if (capabilities.sessionRecordingV3Ingestion) { - const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) - const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) - const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) - - if (!s3) { - throw new Error("Can't start session recording ingestion without object storage") - } - // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas - const ingester = new SessionRecordingIngesterV3(serverConfig, postgres, s3) - await ingester.start() - - const batchConsumer = ingester.batchConsumer - - if (batchConsumer) { - stopSessionRecordingBlobConsumer = () => ingester.stop() - shutdownOnConsumerExit(batchConsumer) - healthChecks['session-recordings-ingestion'] = () => ingester.isHealthy() ?? false - } - } - if (capabilities.personOverrides) { const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index f3106fe62dd..a716f7bfe97 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -78,7 +78,6 @@ export enum PluginServerMode { analytics_ingestion = 'analytics-ingestion', recordings_blob_ingestion = 'recordings-blob-ingestion', recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow', - recordings_ingestion_v3 = 'recordings-ingestion-v3', person_overrides = 'person-overrides', } @@ -311,7 +310,6 @@ export interface PluginServerCapabilities { processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestion?: boolean sessionRecordingBlobOverflowIngestion?: boolean - sessionRecordingV3Ingestion?: boolean personOverrides?: boolean appManagementSingleton?: boolean preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/__snapshots__/utils.test.ts.snap b/plugin-server/tests/main/ingestion-queues/session-recording/__snapshots__/utils.test.ts.snap index 9a708df6f6c..6a23a3d9882 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/__snapshots__/utils.test.ts.snap +++ b/plugin-server/tests/main/ingestion-queues/session-recording/__snapshots__/utils.test.ts.snap @@ -1,103 +1,5 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP -exports[` 1`] = ` -Array [ - Object { - "distinct_id": "1", - "eventsByWindowId": Object { - "window_1": Array [ - Object { - "data": Object {}, - "timestamp": 1, - "type": 1, - }, - Object { - "data": Object {}, - "timestamp": 2, - "type": 2, - }, - ], - "window_2": Array [ - Object { - "data": Object {}, - "timestamp": 3, - "type": 3, - }, - ], - }, - "eventsRange": Object { - "end": 3, - "start": 1, - }, - "metadata": Object { - "highOffset": 3, - "lowOffset": 1, - "partition": 1, - "rawSize": 12, - "timestamp": 1, - "topic": "the_topic", - }, - "session_id": "1", - "snapshot_source": null, - "team_id": 1, - }, - Object { - "distinct_id": "1", - "eventsByWindowId": Object { - "window_1": Array [ - Object { - "data": Object {}, - "timestamp": 4, - "type": 4, - }, - ], - }, - "eventsRange": Object { - "end": 4, - "start": 4, - }, - "metadata": Object { - "highOffset": 4, - "lowOffset": 4, - "partition": 1, - "rawSize": 30, - "timestamp": 4, - "topic": "the_topic", - }, - "session_id": "1", - "snapshot_source": null, - "team_id": 2, - }, - Object { - "distinct_id": "1", - "eventsByWindowId": Object { - "window_1": Array [ - Object { - "data": Object {}, - "timestamp": 5, - "type": 5, - }, - ], - }, - "eventsRange": Object { - "end": 5, - "start": 5, - }, - "metadata": Object { - "highOffset": 5, - "lowOffset": 5, - "partition": 1, - "rawSize": 31, - "timestamp": 5, - "topic": "the_topic", - }, - "session_id": "2", - "snapshot_source": null, - "team_id": 1, - }, -] -`; - exports[`session-recording utils parseKafkaBatch can parse and reduce a batch of messages 1`] = ` Array [ Object { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts deleted file mode 100644 index ba02198f953..00000000000 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts +++ /dev/null @@ -1,319 +0,0 @@ -import { Upload } from '@aws-sdk/lib-storage' -import { randomUUID } from 'crypto' -import fs from 'fs/promises' -import { DateTime, Settings } from 'luxon' -import path from 'path' -import { PassThrough } from 'stream' -import * as zlib from 'zlib' - -import { defaultConfig } from '../../../../../src/config/config' -import { SessionManagerV3 } from '../../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3' -import { now } from '../../../../../src/main/ingestion-queues/session-recording/utils' -import { createIncomingRecordingMessage } from '../fixtures' - -jest.mock('@aws-sdk/lib-storage', () => { - const mockUpload = jest.fn().mockImplementation(() => { - return { - abort: jest.fn().mockResolvedValue(undefined), - done: jest.fn().mockResolvedValue(undefined), - } - }) - - return { - __esModule: true, - Upload: mockUpload, - } -}) - -const tmpDir = path.join(__dirname, '../../../../../.tmp/test_session_recordings') - -describe('session-manager', () => { - jest.setTimeout(1000) - let sessionManager: SessionManagerV3 - const mockS3Client: any = { - send: jest.fn(), - } - - const createSessionManager = async ( - sessionId = randomUUID(), - teamId = 1, - partition = 1 - ): Promise => { - const manager = new SessionManagerV3(defaultConfig, mockS3Client, { - sessionId, - teamId, - partition, - dir: path.join(tmpDir, `${partition}`, `${teamId}__${sessionId}`), - }) - - await manager.setupPromise - return manager - } - - const flushThreshold = defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000 - - beforeEach(async () => { - // it's always May 25 - Settings.now = () => new Date(2018, 4, 25).valueOf() - - if (await fs.stat(tmpDir).catch(() => null)) { - await fs.rmdir(tmpDir, { recursive: true }) - } - - sessionManager = await createSessionManager() - }) - - afterEach(async () => { - await sessionManager?.stop() - // it's no longer always May 25 - Settings.now = () => new Date().valueOf() - }) - - it('adds a message', async () => { - const timestamp = now() - const event = createIncomingRecordingMessage({ - eventsRange: { - start: timestamp, - end: timestamp + 1000, - }, - eventsByWindowId: { - window_id_1: [ - { timestamp: timestamp, type: 4, data: { href: 'http://localhost:3001/' } }, - { timestamp: timestamp + 1000, type: 4, data: { href: 'http://localhost:3001/' } }, - ], - }, - }) - - await sessionManager.add(event) - - expect(sessionManager.buffer).toEqual({ - sizeEstimate: 193, - count: 1, - eventsRange: { firstTimestamp: timestamp, lastTimestamp: timestamp + 1000 }, - createdAt: timestamp, - }) - - const stats = await fs.stat(`${sessionManager.context.dir}/buffer.jsonl`) - expect(stats.size).toBeGreaterThan(0) - }) - - it('does not flush if it has received a message recently', async () => { - const now = DateTime.now() - - const event = createIncomingRecordingMessage({ - metadata: { - timestamp: now, - } as any, - }) - - await sessionManager.add(event) - await sessionManager.flush() - - expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) - }) - - it('does flush if the stored file is older than the threshold', async () => { - const firstTimestamp = 1700000000000 - const lastTimestamp = 1700000000000 + 4000 - - const eventOne = createIncomingRecordingMessage({ - eventsRange: { - start: firstTimestamp, - end: firstTimestamp, - }, - eventsByWindowId: { - window_id_1: [{ timestamp: firstTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }], - }, - }) - const eventTwo = createIncomingRecordingMessage({ - eventsRange: { - start: lastTimestamp, - end: lastTimestamp, - }, - eventsByWindowId: { - window_id_1: [{ timestamp: lastTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }], - }, - }) - - await sessionManager.add(eventOne) - await sessionManager.add(eventTwo) - - sessionManager.buffer!.createdAt = now() - flushThreshold - 1 - - await sessionManager.flush() - - expect(await fs.readdir(sessionManager.context.dir)).toEqual([]) - - const sessionId = sessionManager.context.sessionId - - // as a proxy for flush having been called or not - const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls - expect(mockUploadCalls.length).toBe(1) - expect(mockUploadCalls[0].length).toBe(1) - expect(mockUploadCalls[0][0]).toEqual( - expect.objectContaining({ - params: expect.objectContaining({ - Key: `session_recordings/team_id/1/session_id/${sessionId}/data/${firstTimestamp}-${lastTimestamp}.jsonl`, - }), - }) - ) - }) - - it('has a fixed jitter based on the serverConfig', async () => { - const minJitter = 1 - defaultConfig.SESSION_RECORDING_BUFFER_AGE_JITTER - for (const _ of Array(100).keys()) { - const sm = await createSessionManager() - expect(sm.flushJitterMultiplier).toBeGreaterThanOrEqual(minJitter) - expect(sm.flushJitterMultiplier).toBeLessThanOrEqual(1) - } - }) - - it('not remove files when stopped', async () => { - expect(await fs.readdir(sessionManager.context.dir)).toEqual([]) - await sessionManager.add(createIncomingRecordingMessage()) - expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) - await sessionManager.stop() - expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) - }) - - it('removes the directly when stopped after fully flushed', async () => { - const sm = await createSessionManager('session_id_2', 2, 2) - expect(await fs.readdir(sm.context.dir)).toEqual([]) - await sm.add(createIncomingRecordingMessage()) - expect(await fs.readdir(sm.context.dir)).toEqual(['buffer.jsonl', 'metadata.json']) - await sm.flush(true) - expect(await fs.readdir(sm.context.dir)).toEqual([]) - await sm.stop() - // ;(sessionManager as any) = undefined // Stop the afterEach from failing - - await expect(fs.stat(sm.context.dir)).rejects.toThrowError('ENOENT: no such file or directory') - }) - - it('reads successfully with the stream not closed', async () => { - const event = createIncomingRecordingMessage({ - eventsByWindowId: { - window_id_1: [ - { timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }, - { timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } }, - ], - }, - }) - await sessionManager.add(event) - - const content = await fs.readFile(`${sessionManager.context.dir}/buffer.jsonl`, 'utf-8') - expect(content).toEqual( - '{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n' - ) - }) - - it('adds to the existing buffer when restarted', async () => { - const sm1 = await createSessionManager('session_id_2', 2, 2) - - await sm1.add( - createIncomingRecordingMessage({ - eventsByWindowId: { - window_id_1: [ - { timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }, - { timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } }, - ], - }, - }) - ) - - const sm2 = await createSessionManager('session_id_2', 2, 2) - await sm2.add( - createIncomingRecordingMessage({ - eventsByWindowId: { - window_id_1: [ - { timestamp: 170000000 + 2000, type: 4, data: { href: 'http://localhost:3001/' } }, - { timestamp: 170000000 + 3000, type: 4, data: { href: 'http://localhost:3001/' } }, - ], - }, - }) - ) - - const buffer = await fs.readFile(`${sm1.context.dir}/buffer.jsonl`, 'utf-8') - expect(buffer).toEqual( - '{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n{"window_id":"window_id_1","data":[{"timestamp":170002000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170003000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n' - ) - - await sm1.stop() - await sm2.stop() - }) - - it('uploads a gzip compressed file to S3', async () => { - await sessionManager.add( - createIncomingRecordingMessage({ - eventsRange: { - start: 170000000, - end: 170000000, - }, - eventsByWindowId: { - window_id_1: [{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }], - }, - }) - ) - await sessionManager.flush(true) - - const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls - expect(mockUploadCalls[0][0]).toMatchObject({ - params: { - Bucket: 'posthog', - Key: `session_recordings/team_id/1/session_id/${sessionManager.context.sessionId}/data/170000000-170000000.jsonl`, - ContentEncoding: 'gzip', - ContentType: 'application/jsonl', - }, - }) - const uploadBody = mockUploadCalls[0][0].params.Body - - expect(uploadBody).toBeInstanceOf(PassThrough) - - // Extract the content from the stream and gzip decompress it - const chunks: Uint8Array[] = [] - for await (const chunk of uploadBody) { - chunks.push(chunk) - } - - const buffer = Buffer.concat(chunks) - const uncompressed = zlib.gunzipSync(buffer).toString('utf-8') - - expect(uncompressed).toEqual( - '{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n' - ) - }) - - it('handles a corrupted metadata.json file', async () => { - const sm1 = await createSessionManager('session_id_2', 2, 2) - - await sm1.add( - createIncomingRecordingMessage({ - eventsByWindowId: { - window_id_1: [ - { timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }, - { timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } }, - ], - }, - }) - ) - - await sm1.stop() - - await fs.writeFile(`${sm1.context.dir}/metadata.json`, 'CORRUPTEDDD', 'utf-8') - - const sm2 = await createSessionManager('session_id_2', 2, 2) - - expect(sm2.buffer).toEqual({ - count: 1, - createdAt: expect.any(Number), - eventsRange: { - firstTimestamp: expect.any(Number), - lastTimestamp: expect.any(Number), - }, - sizeEstimate: 185, - }) - - expect(sm2.buffer?.createdAt).toBeGreaterThanOrEqual(0) - expect(sm2.buffer?.eventsRange?.firstTimestamp).toBe(sm2.buffer!.createdAt) - expect(sm2.buffer?.eventsRange?.lastTimestamp).toBeGreaterThanOrEqual(sm2.buffer!.eventsRange!.firstTimestamp) - }) -}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts deleted file mode 100644 index 26a3af88962..00000000000 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ /dev/null @@ -1,369 +0,0 @@ -import { randomUUID } from 'crypto' -import fs from 'fs/promises' -import { mkdirSync, rmSync } from 'node:fs' -import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka' -import path from 'path' - -import { waitForExpect } from '../../../../functional_tests/expectations' -import { defaultConfig } from '../../../../src/config/config' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics' -import { - SessionManagerBufferContext, - SessionManagerContext, -} from '../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3' -import { SessionRecordingIngesterV3 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v3' -import { Hub, PluginsServerConfig, Team } from '../../../../src/types' -import { createHub } from '../../../../src/utils/db/hub' -import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql' -import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures' - -const SESSION_RECORDING_REDIS_PREFIX = '@posthog-tests/replay/' - -const tmpDir = path.join(__dirname, '../../../../.tmp/test_session_recordings') - -const noop = () => undefined - -const config: PluginsServerConfig = { - ...defaultConfig, - SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: true, - SESSION_RECORDING_REDIS_PREFIX, - SESSION_RECORDING_LOCAL_DIRECTORY: tmpDir, -} - -async function deleteKeysWithPrefix(hub: Hub) { - const redisClient = await hub.redisPool.acquire() - const keys = await redisClient.keys(`${SESSION_RECORDING_REDIS_PREFIX}*`) - const pipeline = redisClient.pipeline() - keys.forEach(function (key) { - pipeline.del(key) - }) - await pipeline.exec() - await hub.redisPool.release(redisClient) -} - -const mockConsumer = { - on: jest.fn(), - commitSync: jest.fn(), - commit: jest.fn(), - queryWatermarkOffsets: jest.fn(), - committed: jest.fn(), - assignments: jest.fn(), - isConnected: jest.fn(() => true), - getMetadata: jest.fn(), -} - -jest.mock('../../../../src/kafka/batch-consumer', () => { - return { - startBatchConsumer: jest.fn(() => - Promise.resolve({ - join: () => ({ - finally: jest.fn(), - }), - stop: jest.fn(), - consumer: mockConsumer, - }) - ), - } -}) - -jest.setTimeout(1000) - -describe('ingester', () => { - let ingester: SessionRecordingIngesterV3 - - let hub: Hub - let closeHub: () => Promise - let team: Team - let teamToken = '' - let mockOffsets: Record = {} - let mockCommittedOffsets: Record = {} - const consumedTopic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS - - beforeAll(async () => { - mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) - await resetTestDatabase() - }) - - beforeEach(async () => { - if (await fs.stat(tmpDir).catch(() => null)) { - await fs.rmdir(tmpDir, { recursive: true }) - } - - // The below mocks simulate committing to kafka and querying the offsets - mockCommittedOffsets = {} - mockOffsets = {} - mockConsumer.commit.mockImplementation( - (tpo: TopicPartitionOffset) => (mockCommittedOffsets[tpo.partition] = tpo.offset) - ) - mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { - cb(null, { highOffset: mockOffsets[partition] ?? 1, lowOffset: 0 }) - }) - - mockConsumer.getMetadata.mockImplementation((options, cb) => { - cb(null, { - topics: [{ name: options.topic, partitions: [{ id: 0 }, { id: 1 }, { id: 2 }] }], - }) - }) - - mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], _timeout, cb) => { - const tpos: TopicPartitionOffset[] = topicPartitions.map((tp) => ({ - topic: tp.topic, - partition: tp.partition, - offset: mockCommittedOffsets[tp.partition] ?? 1, - })) - - cb(null, tpos) - }) - ;[hub, closeHub] = await createHub() - team = await getFirstTeam(hub) - teamToken = team.api_token - await deleteKeysWithPrefix(hub) - - ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - await ingester.start() - - mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)]) - }) - - afterEach(async () => { - jest.setTimeout(10000) - await deleteKeysWithPrefix(hub) - await closeHub() - }) - - afterAll(() => { - rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) - jest.useRealTimers() - }) - - const createMessage = (session_id: string, partition = 1) => { - mockOffsets[partition] = mockOffsets[partition] ?? 0 - mockOffsets[partition]++ - - return createKafkaMessage( - consumedTopic, - teamToken, - { - partition, - offset: mockOffsets[partition], - }, - { - $session_id: session_id, - } - ) - } - - // disconnecting a producer is not safe to call multiple times - // in order to let us test stopping the ingester elsewhere - // in most tests we automatically stop the ingester during teardown - describe('when ingester.stop is called in teardown', () => { - afterEach(async () => { - await ingester.stop() - }) - - it('can parse debug partition config', () => { - const config = { - SESSION_RECORDING_DEBUG_PARTITION: '103', - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig - - const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toEqual(103) - }) - - it('can parse absence of debug partition config', () => { - const config = { - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig - - const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toBeUndefined() - }) - - it('creates a new session manager if needed', async () => { - const event = createIncomingRecordingMessage() - await ingester.consume(event) - await waitForExpect(() => { - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1__session_id_1']).toBeDefined() - }) - }) - - it('handles multiple incoming sessions', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage({ - session_id: 'session_id_2', - }) - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(2) - expect(ingester.sessions['1__session_id_1']).toBeDefined() - expect(ingester.sessions['1__session_id_2']).toBeDefined() - }) - - it('handles parallel ingestion of the same session', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage() - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1__session_id_1']).toBeDefined() - }) - - it('destroys a session manager if finished', async () => { - const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` - const event = createIncomingRecordingMessage({ - session_id: sessionId, - }) - await ingester.consume(event) - expect(ingester.sessions[`1__${sessionId}`]).toBeDefined() - ingester.sessions[`1__${sessionId}`].buffer!.createdAt = 0 - - await ingester.flushAllReadySessions(() => undefined) - - await waitForExpect(() => { - expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined() - }, 10000) - }) - - describe('batch event processing', () => { - it('should batch parse incoming events and batch them to reduce writes', async () => { - mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)]) - await ingester.handleEachBatch( - [ - createMessage('session_id_1', 1), - createMessage('session_id_1', 1), - createMessage('session_id_1', 1), - createMessage('session_id_2', 1), - ], - noop - ) - - expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.count).toBe(1) - expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.count).toBe(1) - - let fileContents = await fs.readFile( - path.join(ingester.sessions[`${team.id}__session_id_1`].context.dir, 'buffer.jsonl'), - 'utf-8' - ) - - expect(JSON.parse(fileContents).data).toHaveLength(3) - - fileContents = await fs.readFile( - path.join(ingester.sessions[`${team.id}__session_id_2`].context.dir, 'buffer.jsonl'), - 'utf-8' - ) - - expect(JSON.parse(fileContents).data).toHaveLength(1) - }) - }) - - describe('simulated rebalancing', () => { - let otherIngester: SessionRecordingIngesterV3 - jest.setTimeout(5000) // Increased to cover lock delay - - beforeEach(async () => { - otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - await otherIngester.start() - }) - - afterEach(async () => { - await otherIngester.stop() - }) - - const getSessions = ( - ingester: SessionRecordingIngesterV3 - ): (SessionManagerContext & SessionManagerBufferContext)[] => - Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer! })) - - /** - * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs - * Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up - */ - it('rebalances new consumers', async () => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - - mockConsumer.assignments.mockImplementation(() => [ - createTP(1, consumedTopic), - createTP(2, consumedTopic), - createTP(3, consumedTopic), - ]) - await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) - - expect(getSessions(ingester)).toMatchObject([ - { sessionId: 'session_id_1', partition: 1, count: 1 }, - { sessionId: 'session_id_2', partition: 1, count: 1 }, - { sessionId: 'session_id_3', partition: 2, count: 1 }, - { sessionId: 'session_id_4', partition: 2, count: 1 }, - ]) - - // Call handleEachBatch with both consumers - we simulate the assignments which - // is what is responsible for the actual syncing of the sessions - mockConsumer.assignments.mockImplementation(() => [ - createTP(2, consumedTopic), - createTP(3, consumedTopic), - ]) - await otherIngester.handleEachBatch( - [createMessage('session_id_4', 2), createMessage('session_id_5', 2)], - noop - ) - mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)]) - await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop) - - // Should still have the partition 1 sessions that didnt move with added events - expect(getSessions(ingester)).toMatchObject([ - { sessionId: 'session_id_1', partition: 1, count: 2 }, - { sessionId: 'session_id_2', partition: 1, count: 1 }, - ]) - expect(getSessions(otherIngester)).toMatchObject([ - { sessionId: 'session_id_3', partition: 2, count: 1 }, - { sessionId: 'session_id_4', partition: 2, count: 2 }, - { sessionId: 'session_id_5', partition: 2, count: 1 }, - ]) - }) - }) - - describe('when a team is disabled', () => { - it('ignores invalid teams', async () => { - // non-zero offset because the code can't commit offset 0 - await ingester.handleEachBatch( - [ - createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }), - createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }), - ], - noop - ) - - expect(ingester.sessions).toEqual({}) - }) - }) - - describe('heartbeats', () => { - it('it should send them whilst processing', async () => { - const heartbeat = jest.fn() - // non-zero offset because the code can't commit offset 0 - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, heartbeat) - - expect(heartbeat).toBeCalledTimes(5) - }) - }) - }) - - describe('when ingester.stop is not called in teardown', () => { - describe('stop()', () => { - const setup = async (): Promise => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, noop) - } - - it('shuts down without error', async () => { - await setup() - - await expect(ingester.stop()).resolves.toMatchObject([ - // destroy sessions, - { status: 'fulfilled' }, - ]) - }) - }) - }) -}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index 4e8f71f0cb2..edd4f95bebd 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -1,7 +1,6 @@ import { Settings } from 'luxon' import { Message, MessageHeader } from 'node-rdkafka' -import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types' import { allSettledWithConcurrency, getLagMultiplier, @@ -9,7 +8,6 @@ import { minDefined, parseKafkaBatch, parseKafkaMessage, - reduceRecordingMessages, } from '../../../../src/main/ingestion-queues/session-recording/utils' import { KafkaProducerWrapper } from '../../../../src/utils/db/kafka-producer-wrapper' import { UUIDT } from '../../../../src/utils/utils' @@ -716,67 +714,6 @@ describe('session-recording utils', () => { }) }) - describe('reduceMessages', () => { - const messages: IncomingRecordingMessage[] = [ - // Should merge - { - distinct_id: '1', - eventsRange: { start: 1, end: 1 }, - eventsByWindowId: { window_1: [{ timestamp: 1, type: 1, data: {} }] }, - metadata: { lowOffset: 1, highOffset: 1, partition: 1, timestamp: 1, topic: 'the_topic', rawSize: 5 }, - session_id: '1', - team_id: 1, - snapshot_source: null, - }, - { - distinct_id: '1', - eventsRange: { start: 2, end: 2 }, - eventsByWindowId: { window_1: [{ timestamp: 2, type: 2, data: {} }] }, - metadata: { lowOffset: 2, highOffset: 2, partition: 1, timestamp: 2, topic: 'the_topic', rawSize: 4 }, - session_id: '1', - team_id: 1, - snapshot_source: null, - }, - // Different window_id but should still merge - { - distinct_id: '1', - eventsRange: { start: 3, end: 3 }, - eventsByWindowId: { window_2: [{ timestamp: 3, type: 3, data: {} }] }, - metadata: { lowOffset: 3, highOffset: 3, partition: 1, timestamp: 3, topic: 'the_topic', rawSize: 3 }, - session_id: '1', - team_id: 1, - snapshot_source: null, - }, - // different team - { - distinct_id: '1', - eventsRange: { start: 4, end: 4 }, - eventsByWindowId: { window_1: [{ timestamp: 4, type: 4, data: {} }] }, - metadata: { lowOffset: 4, highOffset: 4, partition: 1, timestamp: 4, topic: 'the_topic', rawSize: 30 }, - session_id: '1', - team_id: 2, - snapshot_source: null, - }, - // Different session_id - { - distinct_id: '1', - eventsRange: { start: 5, end: 5 }, - eventsByWindowId: { window_1: [{ timestamp: 5, type: 5, data: {} }] }, - metadata: { lowOffset: 5, highOffset: 5, partition: 1, timestamp: 5, topic: 'the_topic', rawSize: 31 }, - session_id: '2', - team_id: 1, - snapshot_source: null, - }, - ] - - // Call it once already to make sure that it doesn't mutate the input - expect(reduceRecordingMessages(messages)).toHaveLength(3) - const reduced = reduceRecordingMessages(messages) - expect(reduceRecordingMessages(messages)).toMatchSnapshot() - expect(reduced[0].eventsRange).toEqual({ start: 1, end: 3 }) - expect(reduced[0].metadata).toMatchObject({ lowOffset: 1, highOffset: 3 }) - }) - describe('allSettledWithConcurrency', () => { jest.setTimeout(1000) it('should resolve promises in parallel with a max consumption', async () => {