0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-29 03:04:16 +01:00

chore: Remove experimental replay ingester (#21969)

This commit is contained in:
Ben White 2024-05-06 09:15:25 +02:00 committed by GitHub
parent 9c7d5a7f87
commit 1323449293
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 0 additions and 1976 deletions

View File

@ -61,11 +61,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
sessionRecordingBlobOverflowIngestion: true,
...sharedCapabilities,
}
case PluginServerMode.recordings_ingestion_v3:
return {
sessionRecordingV3Ingestion: true,
...sharedCapabilities,
}
case PluginServerMode.async_onevent:
return {
processAsyncOnEventHandlers: true,

View File

@ -1,526 +0,0 @@
import { Upload } from '@aws-sdk/lib-storage'
import { captureException, captureMessage } from '@sentry/node'
import { createReadStream, createWriteStream, WriteStream } from 'fs'
import { mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises'
import path from 'path'
import { Counter, Histogram } from 'prom-client'
import { PassThrough } from 'stream'
import { pipeline } from 'stream/promises'
import * as zlib from 'zlib'
import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
import { asyncTimeoutGuard } from '../../../../utils/timing'
import { ObjectStorage } from '../../../services/object_storage'
import { IncomingRecordingMessage } from '../types'
import { convertForPersistence, maxDefined, minDefined, now } from '../utils'
const BUCKETS_LINES_WRITTEN = [0, 10, 50, 100, 500, 1000, 2000, 5000, 10000, Infinity]
export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity]
const S3_UPLOAD_WARN_TIME_SECONDS = 2 * 60 * 1000
// NOTE: To remove once released
const metricPrefix = 'v3_'
export const FILE_EXTENSION = '.jsonl'
export const BUFFER_FILE_NAME = `buffer${FILE_EXTENSION}`
export const FLUSH_FILE_EXTENSION = `.flush${FILE_EXTENSION}`
export const METADATA_FILE_NAME = `metadata.json`
const writeStreamBlocked = new Counter({
name: metricPrefix + 'recording_blob_ingestion_write_stream_blocked',
help: 'Number of times we get blocked by the stream backpressure',
})
const counterS3FilesWritten = new Counter({
name: metricPrefix + 'recording_s3_files_written',
help: 'A single file flushed to S3',
})
const counterS3WriteErrored = new Counter({
name: metricPrefix + 'recording_s3_write_errored',
help: 'Indicates that we failed to flush to S3 without recovering',
})
const bufferLoadFailedCounter = new Counter({
name: metricPrefix + 'recording_load_from_file_failed',
help: 'Indicates that we failed to load the file from disk',
})
const histogramS3LinesWritten = new Histogram({
name: metricPrefix + 'recording_s3_lines_written_histogram',
help: 'The number of lines in a file we send to s3',
buckets: BUCKETS_LINES_WRITTEN,
})
const histogramS3KbWritten = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_s3_kb_written',
help: 'The uncompressed size of file we send to S3',
buckets: BUCKETS_KB_WRITTEN,
})
const histogramSessionAgeSeconds = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_age_seconds',
help: 'The age of current sessions in seconds',
buckets: [0, 60, 60 * 2, 60 * 5, 60 * 8, 60 * 10, 60 * 12, 60 * 15, 60 * 20, Infinity],
})
const histogramSessionSizeKb = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_size_kb',
help: 'The size of current sessions in kb',
buckets: BUCKETS_KB_WRITTEN,
})
const histogramFlushTimeSeconds = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_flush_time_seconds',
help: 'The time taken to flush a session in seconds',
buckets: [0, 2, 5, 10, 20, 30, 60, 120, 180, 300, Infinity],
})
const histogramSessionSize = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_lines',
help: 'The size of sessions in numbers of lines',
buckets: BUCKETS_LINES_WRITTEN,
})
const histogramBackpressureBlockedSeconds = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_backpressure_blocked_seconds',
help: 'The time taken to flush a session in seconds',
buckets: [0, 2, 5, 10, 20, 30, 60, 120, 180, 300, Infinity],
})
export type SessionManagerBufferContext = {
sizeEstimate: number
count: number
eventsRange: {
firstTimestamp: number
lastTimestamp: number
} | null
createdAt: number
}
// Context that is updated and persisted to disk so must be serializable
export type SessionManagerContext = {
dir: string
sessionId: string
teamId: number
partition: number
}
export class SessionManagerV3 {
buffer?: SessionManagerBufferContext
bufferWriteStream?: WriteStream
flushPromise?: Promise<void>
destroying = false
inProgressUpload: Upload | null = null
flushJitterMultiplier: number
readonly setupPromise: Promise<void>
constructor(
public readonly serverConfig: PluginsServerConfig,
public readonly s3Client: ObjectStorage['s3'],
public readonly context: SessionManagerContext
) {
// We add a jitter multiplier to the buffer age so that we don't have all sessions flush at the same time
this.flushJitterMultiplier = 1 - Math.random() * serverConfig.SESSION_RECORDING_BUFFER_AGE_JITTER
this.setupPromise = this.setup()
}
private file(name: string): string {
return path.join(this.context.dir, name)
}
private async setup(): Promise<void> {
await mkdir(this.context.dir, { recursive: true })
const bufferFileExists = await stat(this.file(BUFFER_FILE_NAME))
.then(() => true)
.catch(() => false)
let metadataFileContent: string | undefined
let context: SessionManagerBufferContext | undefined
if (!bufferFileExists) {
status.info('📦', '[session-manager] started new manager', {
...this.context,
...(this.buffer ?? {}),
})
return
}
try {
metadataFileContent = await readFile(this.file(METADATA_FILE_NAME), 'utf-8')
context = JSON.parse(metadataFileContent)
} catch (error) {
// Indicates no buffer metadata file or it's corrupted
status.error('🧨', '[session-manager] failed to read buffer metadata.json', {
...this.context,
error,
})
this.captureMessage('Failed to read buffer metadata.json', { error })
// NOTE: This is not ideal... we fallback to loading the buffer.jsonl and deriving metadata from that as best as possible
// If that still fails then we have to bail out and drop the buffer.jsonl (data loss...)
try {
const stats = await stat(this.file(BUFFER_FILE_NAME))
context = {
sizeEstimate: stats.size,
count: 1, // We can't afford to load the whole file into memory so we assume 1 line
eventsRange: {
firstTimestamp: Math.round(stats.birthtimeMs),
// This is really less than ideal but we don't have much choice
lastTimestamp: Date.now(),
},
createdAt: Math.round(stats.birthtimeMs),
}
} catch (error) {
status.error('🧨', '[session-manager] failed to determine metadata from buffer file', {
...this.context,
error,
})
}
}
if (!context) {
// Indicates we couldn't successfully read the metadata file
await unlink(this.file(METADATA_FILE_NAME)).catch(() => null)
await unlink(this.file(BUFFER_FILE_NAME)).catch(() => null)
bufferLoadFailedCounter.inc()
this.captureException(new Error('Failed to read buffer metadata. Resorted to hard deletion'), {
metadataFileContent,
})
return
}
this.buffer = context
status.info('📦', '[session-manager] started new manager from existing file', {
...this.context,
...(this.buffer ?? {}),
})
}
private async syncMetadata(): Promise<void> {
if (this.buffer) {
await writeFile(this.file(METADATA_FILE_NAME), JSON.stringify(this.buffer), 'utf-8')
} else {
await unlink(this.file(METADATA_FILE_NAME))
}
}
private async getFlushFiles(): Promise<string[]> {
return (await readdir(this.context.dir)).filter((file) => file.endsWith(FLUSH_FILE_EXTENSION))
}
private captureException(error: Error, extra: Record<string, any> = {}): void {
captureException(error, {
extra: { ...this.context, ...extra },
tags: { teamId: this.context.teamId, sessionId: this.context.sessionId },
})
}
private captureMessage(message: string, extra: Record<string, any> = {}): void {
const context = this.context
captureMessage(message, {
extra: { ...context, ...extra },
tags: { teamId: context.teamId, sessionId: context.sessionId },
})
}
public async add(message: IncomingRecordingMessage): Promise<void> {
if (this.destroying) {
return
}
await this.setupPromise
try {
const buffer = this.getOrCreateBuffer()
const content =
convertForPersistence(message.eventsByWindowId)
.map((x) => JSON.stringify(x))
.join('\n') + '\n'
buffer.count += 1
buffer.sizeEstimate += content.length
buffer.eventsRange = {
firstTimestamp:
minDefined(message.eventsRange.start, buffer.eventsRange?.firstTimestamp) ??
message.eventsRange.start,
lastTimestamp:
maxDefined(message.eventsRange.end, buffer.eventsRange?.lastTimestamp) ?? message.eventsRange.end,
}
if (!this.bufferWriteStream!.write(content, 'utf-8')) {
writeStreamBlocked.inc()
const stopTimer = histogramBackpressureBlockedSeconds.startTimer()
await new Promise((r) => this.bufferWriteStream!.once('drain', r))
stopTimer()
}
await this.syncMetadata()
} catch (error) {
this.captureException(error, { message })
throw error
}
}
public async isEmpty(): Promise<boolean> {
return !this.buffer?.count && !(await this.getFlushFiles()).length
}
public async flush(force = false): Promise<void> {
if (this.destroying) {
return
}
await this.setupPromise
if (!force) {
await this.maybeFlushCurrentBuffer()
} else {
// This is mostly used by tests
await this.markCurrentBufferForFlush()
}
await this.flushFiles()
}
private async maybeFlushCurrentBuffer(): Promise<void> {
if (!this.buffer) {
return
}
if (this.buffer.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) {
return this.markCurrentBufferForFlush()
}
const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000
const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier
const logContext: Record<string, any> = {
...this.context,
flushThresholdMs,
flushThresholdJitteredMs,
}
if (!this.buffer.count) {
status.warn('🚽', `[session-manager] buffer has no items yet`, { logContext })
return
}
const bufferAgeInMemoryMs = now() - this.buffer.createdAt
// check the in-memory age against a larger value than the flush threshold,
// otherwise we'll flap between reasons for flushing when close to real-time processing
const isSessionAgeOverThreshold = bufferAgeInMemoryMs >= flushThresholdJitteredMs
logContext['bufferAgeInMemoryMs'] = bufferAgeInMemoryMs
logContext['isSessionAgeOverThreshold'] = isSessionAgeOverThreshold
histogramSessionAgeSeconds.observe(bufferAgeInMemoryMs / 1000)
histogramSessionSize.observe(this.buffer.count)
histogramSessionSizeKb.observe(this.buffer.sizeEstimate / 1024)
if (isSessionAgeOverThreshold) {
return this.markCurrentBufferForFlush()
}
}
private async markCurrentBufferForFlush(): Promise<void> {
const buffer = this.buffer
if (!buffer) {
// TODO: maybe error properly here?
return
}
if (!buffer.eventsRange || !buffer.count) {
// Indicates some issue with the buffer so we can close out
this.buffer = undefined
return
}
// ADD FLUSH METRICS HERE
const { firstTimestamp, lastTimestamp } = buffer.eventsRange
const fileName = `${firstTimestamp}-${lastTimestamp}${FLUSH_FILE_EXTENSION}`
histogramS3LinesWritten.observe(buffer.count)
histogramS3KbWritten.observe(buffer.sizeEstimate / 1024)
await new Promise<void>((resolve) => (this.bufferWriteStream ? this.bufferWriteStream.end(resolve) : resolve()))
await rename(this.file(BUFFER_FILE_NAME), this.file(fileName))
this.buffer = undefined
await this.syncMetadata()
}
private async flushFiles(): Promise<void> {
// We read all files marked for flushing and write them to S3
const filesToFlush = await this.getFlushFiles()
await Promise.all(filesToFlush.map((file) => this.flushFile(file)))
}
private async flushFile(filename: string): Promise<void> {
status.info('🚽', '[session-manager] flushing file to S3', {
filename,
...this.context,
})
if (this.destroying) {
status.warn('🚽', '[session-manager] flush called but we are in a destroying state', {
...this.context,
})
return
}
const file = this.file(filename)
const deleteFile = async () => {
await stat(file)
await unlink(file)
}
const endFlushTimer = histogramFlushTimeSeconds.startTimer()
try {
const targetFileName = filename.replace(FLUSH_FILE_EXTENSION, FILE_EXTENSION)
const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.context.teamId}/session_id/${this.context.sessionId}`
const dataKey = `${baseKey}/data/${targetFileName}`
const readStream = createReadStream(file)
const uploadStream = new PassThrough()
// The compressed file
pipeline(readStream, zlib.createGzip(), uploadStream).catch((error) => {
// TODO: If this actually happens we probably want to destroy the buffer as we will be stuck...
status.error('🧨', '[session-manager] writestream errored', {
...this.context,
error,
})
this.captureException(error)
})
readStream.on('error', (err) => {
// TODO: What should we do here?
status.error('🧨', '[session-manager] readstream errored', {
...this.context,
error: err,
})
this.captureException(err)
})
const inProgressUpload = (this.inProgressUpload = new Upload({
client: this.s3Client,
params: {
Bucket: this.serverConfig.OBJECT_STORAGE_BUCKET,
Key: dataKey,
ContentEncoding: 'gzip',
ContentType: 'application/jsonl',
Body: uploadStream,
},
}))
await asyncTimeoutGuard(
{
message: 'session-manager.flush uploading file to S3 delayed.',
timeout: S3_UPLOAD_WARN_TIME_SECONDS,
},
async () => {
await inProgressUpload.done()
}
)
counterS3FilesWritten.inc(1)
} catch (error: any) {
// TRICKY: error can for some reason sometimes be undefined...
error = error || new Error('Unknown Error')
if (error.name === 'AbortError' && this.destroying) {
// abort of inProgressUpload while destroying is expected
return
}
await this.inProgressUpload?.abort()
// TODO: If we fail to write to S3 we should be do something about it
status.error('🧨', '[session-manager] failed writing session recording blob to S3', {
errorMessage: `${error.name || 'Unknown Error Type'}: ${error.message}`,
error,
...this.context,
})
this.captureException(error)
counterS3WriteErrored.inc()
throw error
} finally {
endFlushTimer()
await deleteFile()
}
}
private getOrCreateBuffer(): SessionManagerBufferContext {
if (!this.buffer) {
try {
const buffer: SessionManagerBufferContext = {
sizeEstimate: 0,
count: 0,
eventsRange: null,
createdAt: now(),
}
this.buffer = buffer
} catch (error) {
this.captureException(error)
throw error
}
}
if (this.buffer && !this.bufferWriteStream) {
this.bufferWriteStream = this.createFileStreamFor(path.join(this.context.dir, BUFFER_FILE_NAME))
}
return this.buffer
}
protected createFileStreamFor(file: string): WriteStream {
return createWriteStream(file, {
// Opens in append mode in case it already exists
flags: 'a',
encoding: 'utf-8',
})
}
public async stop(): Promise<void> {
this.destroying = true
if (this.inProgressUpload !== null) {
await this.inProgressUpload.abort().catch((error) => {
status.error('🧨', '[session-manager][realtime] failed to abort in progress upload', {
...this.context,
error,
})
this.captureException(error)
})
this.inProgressUpload = null
}
await new Promise<void>((resolve) => (this.bufferWriteStream ? this.bufferWriteStream.end(resolve) : resolve()))
if (await this.isEmpty()) {
status.info('🧨', '[session-manager] removing empty session directory', {
...this.context,
})
await rmdir(this.context.dir, { recursive: true })
}
}
}

View File

@ -1,525 +0,0 @@
import { captureException } from '@sentry/node'
import { createReadStream } from 'fs'
import { readdir, stat } from 'fs/promises'
import { features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import path from 'path'
import { Counter, Gauge, Histogram } from 'prom-client'
import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer } from '../../../kafka/producer'
import { PluginsServerConfig, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
import { expressApp } from '../../services/http-server'
import { ObjectStorage } from '../../services/object_storage'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { BUCKETS_KB_WRITTEN, BUFFER_FILE_NAME, SessionManagerV3 } from './services/session-manager-v3'
import { IncomingRecordingMessage } from './types'
import { allSettledWithConcurrency, parseKafkaMessage, reduceRecordingMessages } from './utils'
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
// WARNING: Do not change this - it will essentially reset the consumer
const KAFKA_CONSUMER_GROUP_ID = 'session-replay-ingester'
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 60000
// NOTE: To remove once released
const metricPrefix = 'v3_'
const gaugeSessionsHandled = new Gauge({
name: metricPrefix + 'recording_blob_ingestion_session_manager_count',
help: 'A gauge of the number of sessions being handled by this blob ingestion consumer',
})
const histogramKafkaBatchSize = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size',
help: 'The size of the batches we are receiving from Kafka',
buckets: [0, 50, 100, 250, 500, 750, 1000, 1500, 2000, 3000, Infinity],
})
const histogramKafkaBatchSizeKb = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size_kb',
help: 'The size in kb of the batches we are receiving from Kafka',
buckets: BUCKETS_KB_WRITTEN,
})
const counterKafkaMessageReceived = new Counter({
name: metricPrefix + 'recording_blob_ingestion_kafka_message_received',
help: 'The number of messages we have received from Kafka',
labelNames: ['partition'],
})
export interface TeamIDWithConfig {
teamId: TeamId | null
consoleLogIngestionEnabled: boolean
}
/**
* @deprecated Delete reduceRecordingMessages and associated tests when deleting this.
*
* The SessionRecordingIngesterV3
* relies on EFS network storage to avoid the need to delay kafka commits and instead uses the disk
* as the persistent volume for both blob data and the metadata around ingestion.
*/
export class SessionRecordingIngesterV3 {
sessions: Record<string, SessionManagerV3> = {}
replayEventsIngester?: ReplayEventsIngester
consoleLogsIngester?: ConsoleLogsIngester
batchConsumer?: BatchConsumer
teamsRefresher: BackgroundRefresher<Record<string, TeamIDWithConfig>>
config: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
isStopping = false
private promises: Set<Promise<any>> = new Set()
// if ingestion is lagging on a single partition it is often hard to identify _why_,
// this allows us to output more information for that partition
private debugPartition: number | undefined = undefined
private sharedClusterProducerWrapper: KafkaProducerWrapper | undefined
constructor(
private globalServerConfig: PluginsServerConfig,
private postgres: PostgresRouter,
private objectStorage: ObjectStorage
) {
this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION
? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION)
: undefined
// NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis
// We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
this.config = sessionRecordingConsumerConfig(globalServerConfig)
this.teamsRefresher = new BackgroundRefresher(async () => {
try {
status.info('🔁', 'session-replay-ingestion - refreshing teams in the background')
return await fetchTeamTokensWithRecordings(this.postgres)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to refresh teams in the background', e)
captureException(e)
throw e
}
})
}
private get rootDir() {
return path.join(this.config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-recordings')
}
private dirForSession(partition: number, teamId: number, sessionId: string): string {
return path.join(this.rootDir, `${partition}`, `${teamId}__${sessionId}`)
}
private get connectedBatchConsumer(): KafkaConsumer | undefined {
// Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors
const consumer = this.batchConsumer?.consumer
return consumer && consumer.isConnected() ? consumer : undefined
}
private get assignedTopicPartitions(): TopicPartition[] {
return this.connectedBatchConsumer?.assignments() ?? []
}
private get assignedPartitions(): TopicPartition['partition'][] {
return this.assignedTopicPartitions.map((x) => x.partition)
}
private scheduleWork<T>(promise: Promise<T>): Promise<T> {
/**
* Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished.
* That way when shutting down we can wait for all promises to finish before exiting.
*/
this.promises.add(promise)
// we void the promise returned by finally here to avoid the need to await it
void promise.finally(() => this.promises.delete(promise))
return promise
}
public async consume(event: IncomingRecordingMessage): Promise<void> {
const { team_id, session_id } = event
const key = `${team_id}__${session_id}`
const { partition } = event.metadata
if (this.debugPartition === partition) {
status.info('🔁', '[session-replay-ingestion] - [PARTITION DEBUG] - consuming event', {
...event.metadata,
})
}
if (!this.sessions[key]) {
const { partition } = event.metadata
// NOTE: It's important that this stays sync so that parallel calls will not create multiple session managers
this.sessions[key] = new SessionManagerV3(this.config, this.objectStorage.s3, {
teamId: team_id,
sessionId: session_id,
dir: this.dirForSession(partition, team_id, session_id),
partition,
})
}
await this.sessions[key]?.add(event)
}
public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
status.info('🔁', `session-replay-ingestion - handling batch`, {
size: messages.length,
partitionsInBatch: [...new Set(messages.map((x) => x.partition))],
assignedPartitions: this.assignedTopicPartitions.map((x) => x.partition),
sessionsHandled: Object.keys(this.sessions).length,
})
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch`,
logExecutionTime: true,
func: async () => {
histogramKafkaBatchSize.observe(messages.length)
histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024)
let recordingMessages: IncomingRecordingMessage[] = []
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`,
func: async () => {
for (const message of messages) {
counterKafkaMessageReceived.inc({ partition: message.partition })
const recordingMessage = await parseKafkaMessage(
message,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
// v3 consumer does not emit ingestion warnings
undefined
)
if (recordingMessage) {
recordingMessages.push(recordingMessage)
}
}
recordingMessages = reduceRecordingMessages(recordingMessages)
},
})
heartbeat()
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.ensureSessionsAreLoaded`,
func: async () => {
await this.syncSessionsWithDisk(heartbeat)
},
})
heartbeat()
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeBatch`,
func: async () => {
if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
await Promise.all(recordingMessages.map((x) => this.consume(x).then(heartbeat)))
} else {
for (const message of recordingMessages) {
await this.consume(message)
}
}
},
})
heartbeat()
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`,
func: async () => {
// TODO: This can time out if it ends up being overloaded - we should have a max limit here
await this.flushAllReadySessions(heartbeat)
},
})
if (this.replayEventsIngester) {
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`,
func: async () => {
await this.replayEventsIngester!.consumeBatch(recordingMessages)
},
})
}
if (this.consoleLogsIngester) {
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeConsoleLogEvents`,
func: async () => {
await this.consoleLogsIngester!.consumeBatch(recordingMessages)
},
})
}
},
})
}
public async start(): Promise<void> {
status.info('🔁', 'session-replay-ingestion - starting session recordings blob consumer', {
librdKafkaVersion: librdkafkaVersion,
kafkaCapabilities: features,
})
this.setupHttpRoutes()
// Load teams into memory
await this.teamsRefresher.refresh()
// NOTE: This is the only place where we need to use the shared server config
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.globalServerConfig)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.globalServerConfig)
this.sharedClusterProducerWrapper = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
)
this.sharedClusterProducerWrapper.producer.connect()
// NOTE: This is the only place where we need to use the shared server config
if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) {
this.consoleLogsIngester = new ConsoleLogsIngester(this.sharedClusterProducerWrapper.producer)
}
if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) {
this.replayEventsIngester = new ReplayEventsIngester(this.sharedClusterProducerWrapper.producer)
}
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
// the batch consumer reads from the session replay kafka cluster
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config)
this.batchConsumer = await startBatchConsumer({
connectionConfig: replayClusterConnectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: true, // NOTE: This is the crucial difference between this and the other consumer
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
// our messages are very big, so we don't want to buffer too many
queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
eachBatch: async (messages, { heartbeat }) => {
return await this.scheduleWork(this.handleEachBatch(messages, heartbeat))
},
callEachBatchWhenEmpty: true, // Useful as we will still want to account for flushing sessions
debug: this.config.SESSION_RECORDING_KAFKA_DEBUG,
})
addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)
this.batchConsumer.consumer.on('disconnected', async (err) => {
// since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect
// we need to listen to disconnect and make sure we're stopped
status.info('🔁', 'session-replay-ingestion batch consumer disconnected, cleaning up', { err })
await this.stop()
})
}
public async stop(): Promise<PromiseSettledResult<any>[]> {
this.isStopping = true
status.info('🔁', 'session-replay-ingestion - stopping')
// NOTE: We have to get the partitions before we stop the consumer as it throws if disconnected
// const assignedPartitions = this.assignedTopicPartitions
// Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive
await this.batchConsumer?.stop()
void this.scheduleWork(
Promise.allSettled(
Object.entries(this.sessions).map(([key, sessionManager]) => this.destroySession(key, sessionManager))
)
)
const promiseResults = await Promise.allSettled(this.promises)
if (this.sharedClusterProducerWrapper) {
await this.sharedClusterProducerWrapper.disconnect()
}
status.info('👍', 'session-replay-ingestion - stopped!')
return promiseResults
}
public isHealthy() {
// TODO: Maybe extend this to check if we are shutting down so we don't get killed early.
return this.batchConsumer?.isHealthy()
}
async flushAllReadySessions(heartbeat: () => void): Promise<void> {
const sessions = Object.entries(this.sessions)
// NOTE: We want to avoid flushing too many sessions at once as it can cause a lot of disk backpressure stalling the consumer
await allSettledWithConcurrency(
this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES,
sessions,
async ([key, sessionManager], ctx) => {
heartbeat()
if (this.isStopping) {
// We can end up with a large number of flushes. We want to stop early if we hit shutdown
return ctx.break()
}
if (!this.assignedPartitions.includes(sessionManager.context.partition)) {
await this.destroySession(key, sessionManager)
return
}
await sessionManager
.flush()
.catch((err) => {
status.error(
'🚽',
'session-replay-ingestion - failed trying to flush on idle session: ' +
sessionManager.context.sessionId,
{
err,
session_id: sessionManager.context.sessionId,
}
)
captureException(err, { tags: { session_id: sessionManager.context.sessionId } })
})
.then(async () => {
// If the SessionManager is done (flushed and with no more queued events) then we remove it to free up memory
if (await sessionManager.isEmpty()) {
await this.destroySession(key, sessionManager)
}
})
}
)
gaugeSessionsHandled.set(Object.keys(this.sessions).length)
}
private async syncSessionsWithDisk(heartbeat: () => void): Promise<void> {
// NOTE: With a lot of files on disk this can take a long time
// We need to ensure that as we loop we double check that we are still in charge of the partitions
// TODO: Implement that (and also for flushing) it sync the assigned partitions with the current state of the consumer
// As we may get assigned and reassigned partitions, we want to make sure that we have all sessions loaded into memory
for (const partition of this.assignedPartitions) {
const keys = await readdir(path.join(this.rootDir, `${partition}`)).catch(() => {
// This happens if there are no files on disk for that partition yet
return []
})
const relatedKeys = keys.filter((x) => /\d+__[a-zA-Z0-9\-]+/.test(x))
for (const key of relatedKeys) {
// TODO: Ensure sessionId can only be a uuid
const [teamId, sessionId] = key.split('__')
if (this.isStopping) {
// We can end up with a large number of files we are processing. We want to stop early if we hit shutdown
return
}
if (!this.assignedPartitions.includes(partition)) {
// Account for rebalances
continue
}
if (!this.sessions[key]) {
this.sessions[key] = new SessionManagerV3(this.config, this.objectStorage.s3, {
teamId: parseInt(teamId),
sessionId,
dir: this.dirForSession(partition, parseInt(teamId), sessionId),
partition,
})
await this.sessions[key].setupPromise
}
heartbeat()
}
}
}
private async destroySession(key: string, sessionManager: SessionManagerV3): Promise<void> {
delete this.sessions[key]
await sessionManager.stop()
}
private setupHttpRoutes() {
// Mimic the app server's endpoint
expressApp.get(
'/api/projects/:projectId/session_recordings/:sessionId/snapshots',
async (req: any, res: any) => {
await runInstrumentedFunction({
statsKey: `recordingingester.http.getSnapshots`,
func: async () => {
try {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})
// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}
const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}
status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })
// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])
for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)
if (!exists) {
continue
}
const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
}
res.sendStatus(404)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to fetch session', e)
res.sendStatus(500)
}
},
})
}
)
}
}

View File

@ -7,7 +7,6 @@ import { Counter } from 'prom-client'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../utils/status'
import { cloneObject } from '../../../utils/utils'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
@ -359,52 +358,6 @@ export const parseKafkaBatch = async (
}
}
/**
* @deprecated Delete when removing session-recordings-consumer-v3
*/
export const reduceRecordingMessages = (messages: IncomingRecordingMessage[]): IncomingRecordingMessage[] => {
/**
* It can happen that a single batch contains all messages for the same session.
* A big perf win here is to group everything up front and then reduce the messages
* to a single message per session.
*/
const reducedMessages: Record<string, IncomingRecordingMessage> = {}
for (const message of messages) {
const key = `${message.team_id}-${message.session_id}`
if (!reducedMessages[key]) {
reducedMessages[key] = cloneObject(message)
} else {
const existingMessage = reducedMessages[key]
for (const [windowId, events] of Object.entries(message.eventsByWindowId)) {
if (existingMessage.eventsByWindowId[windowId]) {
existingMessage.eventsByWindowId[windowId].push(...events)
} else {
existingMessage.eventsByWindowId[windowId] = events
}
}
existingMessage.metadata.rawSize += message.metadata.rawSize
// Update the events ranges
existingMessage.metadata.lowOffset = Math.min(
existingMessage.metadata.lowOffset,
message.metadata.lowOffset
)
existingMessage.metadata.highOffset = Math.max(
existingMessage.metadata.highOffset,
message.metadata.highOffset
)
// Update the events ranges
existingMessage.eventsRange.start = Math.min(existingMessage.eventsRange.start, message.eventsRange.start)
existingMessage.eventsRange.end = Math.max(existingMessage.eventsRange.end, message.eventsRange.end)
}
}
return Object.values(reducedMessages)
}
export const convertForPersistence = (
messages: IncomingRecordingMessage['eventsByWindowId']
): PersistedRecordingMessage[] => {

View File

@ -39,7 +39,6 @@ import {
} from './ingestion-queues/on-event-handler-consumer'
import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer'
import { SessionRecordingIngester } from './ingestion-queues/session-recording/session-recordings-consumer'
import { SessionRecordingIngesterV3 } from './ingestion-queues/session-recording/session-recordings-consumer-v3'
import { setupCommonRoutes } from './services/http-server'
import { getObjectStorage } from './services/object_storage'
@ -482,27 +481,6 @@ export async function startPluginsServer(
}
}
if (capabilities.sessionRecordingV3Ingestion) {
const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig)
if (!s3) {
throw new Error("Can't start session recording ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngesterV3(serverConfig, postgres, s3)
await ingester.start()
const batchConsumer = ingester.batchConsumer
if (batchConsumer) {
stopSessionRecordingBlobConsumer = () => ingester.stop()
shutdownOnConsumerExit(batchConsumer)
healthChecks['session-recordings-ingestion'] = () => ingester.isHealthy() ?? false
}
}
if (capabilities.personOverrides) {
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))

View File

@ -78,7 +78,6 @@ export enum PluginServerMode {
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow',
recordings_ingestion_v3 = 'recordings-ingestion-v3',
person_overrides = 'person-overrides',
}
@ -311,7 +310,6 @@ export interface PluginServerCapabilities {
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestion?: boolean
sessionRecordingBlobOverflowIngestion?: boolean
sessionRecordingV3Ingestion?: boolean
personOverrides?: boolean
appManagementSingleton?: boolean
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud

View File

@ -1,103 +1,5 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[` 1`] = `
Array [
Object {
"distinct_id": "1",
"eventsByWindowId": Object {
"window_1": Array [
Object {
"data": Object {},
"timestamp": 1,
"type": 1,
},
Object {
"data": Object {},
"timestamp": 2,
"type": 2,
},
],
"window_2": Array [
Object {
"data": Object {},
"timestamp": 3,
"type": 3,
},
],
},
"eventsRange": Object {
"end": 3,
"start": 1,
},
"metadata": Object {
"highOffset": 3,
"lowOffset": 1,
"partition": 1,
"rawSize": 12,
"timestamp": 1,
"topic": "the_topic",
},
"session_id": "1",
"snapshot_source": null,
"team_id": 1,
},
Object {
"distinct_id": "1",
"eventsByWindowId": Object {
"window_1": Array [
Object {
"data": Object {},
"timestamp": 4,
"type": 4,
},
],
},
"eventsRange": Object {
"end": 4,
"start": 4,
},
"metadata": Object {
"highOffset": 4,
"lowOffset": 4,
"partition": 1,
"rawSize": 30,
"timestamp": 4,
"topic": "the_topic",
},
"session_id": "1",
"snapshot_source": null,
"team_id": 2,
},
Object {
"distinct_id": "1",
"eventsByWindowId": Object {
"window_1": Array [
Object {
"data": Object {},
"timestamp": 5,
"type": 5,
},
],
},
"eventsRange": Object {
"end": 5,
"start": 5,
},
"metadata": Object {
"highOffset": 5,
"lowOffset": 5,
"partition": 1,
"rawSize": 31,
"timestamp": 5,
"topic": "the_topic",
},
"session_id": "2",
"snapshot_source": null,
"team_id": 1,
},
]
`;
exports[`session-recording utils parseKafkaBatch can parse and reduce a batch of messages 1`] = `
Array [
Object {

View File

@ -1,319 +0,0 @@
import { Upload } from '@aws-sdk/lib-storage'
import { randomUUID } from 'crypto'
import fs from 'fs/promises'
import { DateTime, Settings } from 'luxon'
import path from 'path'
import { PassThrough } from 'stream'
import * as zlib from 'zlib'
import { defaultConfig } from '../../../../../src/config/config'
import { SessionManagerV3 } from '../../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3'
import { now } from '../../../../../src/main/ingestion-queues/session-recording/utils'
import { createIncomingRecordingMessage } from '../fixtures'
jest.mock('@aws-sdk/lib-storage', () => {
const mockUpload = jest.fn().mockImplementation(() => {
return {
abort: jest.fn().mockResolvedValue(undefined),
done: jest.fn().mockResolvedValue(undefined),
}
})
return {
__esModule: true,
Upload: mockUpload,
}
})
const tmpDir = path.join(__dirname, '../../../../../.tmp/test_session_recordings')
describe('session-manager', () => {
jest.setTimeout(1000)
let sessionManager: SessionManagerV3
const mockS3Client: any = {
send: jest.fn(),
}
const createSessionManager = async (
sessionId = randomUUID(),
teamId = 1,
partition = 1
): Promise<SessionManagerV3> => {
const manager = new SessionManagerV3(defaultConfig, mockS3Client, {
sessionId,
teamId,
partition,
dir: path.join(tmpDir, `${partition}`, `${teamId}__${sessionId}`),
})
await manager.setupPromise
return manager
}
const flushThreshold = defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000
beforeEach(async () => {
// it's always May 25
Settings.now = () => new Date(2018, 4, 25).valueOf()
if (await fs.stat(tmpDir).catch(() => null)) {
await fs.rmdir(tmpDir, { recursive: true })
}
sessionManager = await createSessionManager()
})
afterEach(async () => {
await sessionManager?.stop()
// it's no longer always May 25
Settings.now = () => new Date().valueOf()
})
it('adds a message', async () => {
const timestamp = now()
const event = createIncomingRecordingMessage({
eventsRange: {
start: timestamp,
end: timestamp + 1000,
},
eventsByWindowId: {
window_id_1: [
{ timestamp: timestamp, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: timestamp + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
},
})
await sessionManager.add(event)
expect(sessionManager.buffer).toEqual({
sizeEstimate: 193,
count: 1,
eventsRange: { firstTimestamp: timestamp, lastTimestamp: timestamp + 1000 },
createdAt: timestamp,
})
const stats = await fs.stat(`${sessionManager.context.dir}/buffer.jsonl`)
expect(stats.size).toBeGreaterThan(0)
})
it('does not flush if it has received a message recently', async () => {
const now = DateTime.now()
const event = createIncomingRecordingMessage({
metadata: {
timestamp: now,
} as any,
})
await sessionManager.add(event)
await sessionManager.flush()
expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
})
it('does flush if the stored file is older than the threshold', async () => {
const firstTimestamp = 1700000000000
const lastTimestamp = 1700000000000 + 4000
const eventOne = createIncomingRecordingMessage({
eventsRange: {
start: firstTimestamp,
end: firstTimestamp,
},
eventsByWindowId: {
window_id_1: [{ timestamp: firstTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }],
},
})
const eventTwo = createIncomingRecordingMessage({
eventsRange: {
start: lastTimestamp,
end: lastTimestamp,
},
eventsByWindowId: {
window_id_1: [{ timestamp: lastTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }],
},
})
await sessionManager.add(eventOne)
await sessionManager.add(eventTwo)
sessionManager.buffer!.createdAt = now() - flushThreshold - 1
await sessionManager.flush()
expect(await fs.readdir(sessionManager.context.dir)).toEqual([])
const sessionId = sessionManager.context.sessionId
// as a proxy for flush having been called or not
const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls
expect(mockUploadCalls.length).toBe(1)
expect(mockUploadCalls[0].length).toBe(1)
expect(mockUploadCalls[0][0]).toEqual(
expect.objectContaining({
params: expect.objectContaining({
Key: `session_recordings/team_id/1/session_id/${sessionId}/data/${firstTimestamp}-${lastTimestamp}.jsonl`,
}),
})
)
})
it('has a fixed jitter based on the serverConfig', async () => {
const minJitter = 1 - defaultConfig.SESSION_RECORDING_BUFFER_AGE_JITTER
for (const _ of Array(100).keys()) {
const sm = await createSessionManager()
expect(sm.flushJitterMultiplier).toBeGreaterThanOrEqual(minJitter)
expect(sm.flushJitterMultiplier).toBeLessThanOrEqual(1)
}
})
it('not remove files when stopped', async () => {
expect(await fs.readdir(sessionManager.context.dir)).toEqual([])
await sessionManager.add(createIncomingRecordingMessage())
expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
await sessionManager.stop()
expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
})
it('removes the directly when stopped after fully flushed', async () => {
const sm = await createSessionManager('session_id_2', 2, 2)
expect(await fs.readdir(sm.context.dir)).toEqual([])
await sm.add(createIncomingRecordingMessage())
expect(await fs.readdir(sm.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
await sm.flush(true)
expect(await fs.readdir(sm.context.dir)).toEqual([])
await sm.stop()
// ;(sessionManager as any) = undefined // Stop the afterEach from failing
await expect(fs.stat(sm.context.dir)).rejects.toThrowError('ENOENT: no such file or directory')
})
it('reads successfully with the stream not closed', async () => {
const event = createIncomingRecordingMessage({
eventsByWindowId: {
window_id_1: [
{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
},
})
await sessionManager.add(event)
const content = await fs.readFile(`${sessionManager.context.dir}/buffer.jsonl`, 'utf-8')
expect(content).toEqual(
'{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n'
)
})
it('adds to the existing buffer when restarted', async () => {
const sm1 = await createSessionManager('session_id_2', 2, 2)
await sm1.add(
createIncomingRecordingMessage({
eventsByWindowId: {
window_id_1: [
{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
},
})
)
const sm2 = await createSessionManager('session_id_2', 2, 2)
await sm2.add(
createIncomingRecordingMessage({
eventsByWindowId: {
window_id_1: [
{ timestamp: 170000000 + 2000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 3000, type: 4, data: { href: 'http://localhost:3001/' } },
],
},
})
)
const buffer = await fs.readFile(`${sm1.context.dir}/buffer.jsonl`, 'utf-8')
expect(buffer).toEqual(
'{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n{"window_id":"window_id_1","data":[{"timestamp":170002000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170003000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n'
)
await sm1.stop()
await sm2.stop()
})
it('uploads a gzip compressed file to S3', async () => {
await sessionManager.add(
createIncomingRecordingMessage({
eventsRange: {
start: 170000000,
end: 170000000,
},
eventsByWindowId: {
window_id_1: [{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }],
},
})
)
await sessionManager.flush(true)
const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls
expect(mockUploadCalls[0][0]).toMatchObject({
params: {
Bucket: 'posthog',
Key: `session_recordings/team_id/1/session_id/${sessionManager.context.sessionId}/data/170000000-170000000.jsonl`,
ContentEncoding: 'gzip',
ContentType: 'application/jsonl',
},
})
const uploadBody = mockUploadCalls[0][0].params.Body
expect(uploadBody).toBeInstanceOf(PassThrough)
// Extract the content from the stream and gzip decompress it
const chunks: Uint8Array[] = []
for await (const chunk of uploadBody) {
chunks.push(chunk)
}
const buffer = Buffer.concat(chunks)
const uncompressed = zlib.gunzipSync(buffer).toString('utf-8')
expect(uncompressed).toEqual(
'{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n'
)
})
it('handles a corrupted metadata.json file', async () => {
const sm1 = await createSessionManager('session_id_2', 2, 2)
await sm1.add(
createIncomingRecordingMessage({
eventsByWindowId: {
window_id_1: [
{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
},
})
)
await sm1.stop()
await fs.writeFile(`${sm1.context.dir}/metadata.json`, 'CORRUPTEDDD', 'utf-8')
const sm2 = await createSessionManager('session_id_2', 2, 2)
expect(sm2.buffer).toEqual({
count: 1,
createdAt: expect.any(Number),
eventsRange: {
firstTimestamp: expect.any(Number),
lastTimestamp: expect.any(Number),
},
sizeEstimate: 185,
})
expect(sm2.buffer?.createdAt).toBeGreaterThanOrEqual(0)
expect(sm2.buffer?.eventsRange?.firstTimestamp).toBe(sm2.buffer!.createdAt)
expect(sm2.buffer?.eventsRange?.lastTimestamp).toBeGreaterThanOrEqual(sm2.buffer!.eventsRange!.firstTimestamp)
})
})

View File

@ -1,369 +0,0 @@
import { randomUUID } from 'crypto'
import fs from 'fs/promises'
import { mkdirSync, rmSync } from 'node:fs'
import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka'
import path from 'path'
import { waitForExpect } from '../../../../functional_tests/expectations'
import { defaultConfig } from '../../../../src/config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics'
import {
SessionManagerBufferContext,
SessionManagerContext,
} from '../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3'
import { SessionRecordingIngesterV3 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v3'
import { Hub, PluginsServerConfig, Team } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql'
import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures'
const SESSION_RECORDING_REDIS_PREFIX = '@posthog-tests/replay/'
const tmpDir = path.join(__dirname, '../../../../.tmp/test_session_recordings')
const noop = () => undefined
const config: PluginsServerConfig = {
...defaultConfig,
SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: true,
SESSION_RECORDING_REDIS_PREFIX,
SESSION_RECORDING_LOCAL_DIRECTORY: tmpDir,
}
async function deleteKeysWithPrefix(hub: Hub) {
const redisClient = await hub.redisPool.acquire()
const keys = await redisClient.keys(`${SESSION_RECORDING_REDIS_PREFIX}*`)
const pipeline = redisClient.pipeline()
keys.forEach(function (key) {
pipeline.del(key)
})
await pipeline.exec()
await hub.redisPool.release(redisClient)
}
const mockConsumer = {
on: jest.fn(),
commitSync: jest.fn(),
commit: jest.fn(),
queryWatermarkOffsets: jest.fn(),
committed: jest.fn(),
assignments: jest.fn(),
isConnected: jest.fn(() => true),
getMetadata: jest.fn(),
}
jest.mock('../../../../src/kafka/batch-consumer', () => {
return {
startBatchConsumer: jest.fn(() =>
Promise.resolve({
join: () => ({
finally: jest.fn(),
}),
stop: jest.fn(),
consumer: mockConsumer,
})
),
}
})
jest.setTimeout(1000)
describe('ingester', () => {
let ingester: SessionRecordingIngesterV3
let hub: Hub
let closeHub: () => Promise<void>
let team: Team
let teamToken = ''
let mockOffsets: Record<number, number> = {}
let mockCommittedOffsets: Record<number, number> = {}
const consumedTopic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
beforeAll(async () => {
mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true })
await resetTestDatabase()
})
beforeEach(async () => {
if (await fs.stat(tmpDir).catch(() => null)) {
await fs.rmdir(tmpDir, { recursive: true })
}
// The below mocks simulate committing to kafka and querying the offsets
mockCommittedOffsets = {}
mockOffsets = {}
mockConsumer.commit.mockImplementation(
(tpo: TopicPartitionOffset) => (mockCommittedOffsets[tpo.partition] = tpo.offset)
)
mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => {
cb(null, { highOffset: mockOffsets[partition] ?? 1, lowOffset: 0 })
})
mockConsumer.getMetadata.mockImplementation((options, cb) => {
cb(null, {
topics: [{ name: options.topic, partitions: [{ id: 0 }, { id: 1 }, { id: 2 }] }],
})
})
mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], _timeout, cb) => {
const tpos: TopicPartitionOffset[] = topicPartitions.map((tp) => ({
topic: tp.topic,
partition: tp.partition,
offset: mockCommittedOffsets[tp.partition] ?? 1,
}))
cb(null, tpos)
})
;[hub, closeHub] = await createHub()
team = await getFirstTeam(hub)
teamToken = team.api_token
await deleteKeysWithPrefix(hub)
ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
await ingester.start()
mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)])
})
afterEach(async () => {
jest.setTimeout(10000)
await deleteKeysWithPrefix(hub)
await closeHub()
})
afterAll(() => {
rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true })
jest.useRealTimers()
})
const createMessage = (session_id: string, partition = 1) => {
mockOffsets[partition] = mockOffsets[partition] ?? 0
mockOffsets[partition]++
return createKafkaMessage(
consumedTopic,
teamToken,
{
partition,
offset: mockOffsets[partition],
},
{
$session_id: session_id,
}
)
}
// disconnecting a producer is not safe to call multiple times
// in order to let us test stopping the ingester elsewhere
// in most tests we automatically stop the ingester during teardown
describe('when ingester.stop is called in teardown', () => {
afterEach(async () => {
await ingester.stop()
})
it('can parse debug partition config', () => {
const config = {
SESSION_RECORDING_DEBUG_PARTITION: '103',
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig
const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
expect(ingester['debugPartition']).toEqual(103)
})
it('can parse absence of debug partition config', () => {
const config = {
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig
const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
expect(ingester['debugPartition']).toBeUndefined()
})
it('creates a new session manager if needed', async () => {
const event = createIncomingRecordingMessage()
await ingester.consume(event)
await waitForExpect(() => {
expect(Object.keys(ingester.sessions).length).toBe(1)
expect(ingester.sessions['1__session_id_1']).toBeDefined()
})
})
it('handles multiple incoming sessions', async () => {
const event = createIncomingRecordingMessage()
const event2 = createIncomingRecordingMessage({
session_id: 'session_id_2',
})
await Promise.all([ingester.consume(event), ingester.consume(event2)])
expect(Object.keys(ingester.sessions).length).toBe(2)
expect(ingester.sessions['1__session_id_1']).toBeDefined()
expect(ingester.sessions['1__session_id_2']).toBeDefined()
})
it('handles parallel ingestion of the same session', async () => {
const event = createIncomingRecordingMessage()
const event2 = createIncomingRecordingMessage()
await Promise.all([ingester.consume(event), ingester.consume(event2)])
expect(Object.keys(ingester.sessions).length).toBe(1)
expect(ingester.sessions['1__session_id_1']).toBeDefined()
})
it('destroys a session manager if finished', async () => {
const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}`
const event = createIncomingRecordingMessage({
session_id: sessionId,
})
await ingester.consume(event)
expect(ingester.sessions[`1__${sessionId}`]).toBeDefined()
ingester.sessions[`1__${sessionId}`].buffer!.createdAt = 0
await ingester.flushAllReadySessions(() => undefined)
await waitForExpect(() => {
expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined()
}, 10000)
})
describe('batch event processing', () => {
it('should batch parse incoming events and batch them to reduce writes', async () => {
mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)])
await ingester.handleEachBatch(
[
createMessage('session_id_1', 1),
createMessage('session_id_1', 1),
createMessage('session_id_1', 1),
createMessage('session_id_2', 1),
],
noop
)
expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.count).toBe(1)
expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.count).toBe(1)
let fileContents = await fs.readFile(
path.join(ingester.sessions[`${team.id}__session_id_1`].context.dir, 'buffer.jsonl'),
'utf-8'
)
expect(JSON.parse(fileContents).data).toHaveLength(3)
fileContents = await fs.readFile(
path.join(ingester.sessions[`${team.id}__session_id_2`].context.dir, 'buffer.jsonl'),
'utf-8'
)
expect(JSON.parse(fileContents).data).toHaveLength(1)
})
})
describe('simulated rebalancing', () => {
let otherIngester: SessionRecordingIngesterV3
jest.setTimeout(5000) // Increased to cover lock delay
beforeEach(async () => {
otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
await otherIngester.start()
})
afterEach(async () => {
await otherIngester.stop()
})
const getSessions = (
ingester: SessionRecordingIngesterV3
): (SessionManagerContext & SessionManagerBufferContext)[] =>
Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer! }))
/**
* It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs
* Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up
*/
it('rebalances new consumers', async () => {
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)]
mockConsumer.assignments.mockImplementation(() => [
createTP(1, consumedTopic),
createTP(2, consumedTopic),
createTP(3, consumedTopic),
])
await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop)
expect(getSessions(ingester)).toMatchObject([
{ sessionId: 'session_id_1', partition: 1, count: 1 },
{ sessionId: 'session_id_2', partition: 1, count: 1 },
{ sessionId: 'session_id_3', partition: 2, count: 1 },
{ sessionId: 'session_id_4', partition: 2, count: 1 },
])
// Call handleEachBatch with both consumers - we simulate the assignments which
// is what is responsible for the actual syncing of the sessions
mockConsumer.assignments.mockImplementation(() => [
createTP(2, consumedTopic),
createTP(3, consumedTopic),
])
await otherIngester.handleEachBatch(
[createMessage('session_id_4', 2), createMessage('session_id_5', 2)],
noop
)
mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)])
await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop)
// Should still have the partition 1 sessions that didnt move with added events
expect(getSessions(ingester)).toMatchObject([
{ sessionId: 'session_id_1', partition: 1, count: 2 },
{ sessionId: 'session_id_2', partition: 1, count: 1 },
])
expect(getSessions(otherIngester)).toMatchObject([
{ sessionId: 'session_id_3', partition: 2, count: 1 },
{ sessionId: 'session_id_4', partition: 2, count: 2 },
{ sessionId: 'session_id_5', partition: 2, count: 1 },
])
})
})
describe('when a team is disabled', () => {
it('ignores invalid teams', async () => {
// non-zero offset because the code can't commit offset 0
await ingester.handleEachBatch(
[
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }),
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }),
],
noop
)
expect(ingester.sessions).toEqual({})
})
})
describe('heartbeats', () => {
it('it should send them whilst processing', async () => {
const heartbeat = jest.fn()
// non-zero offset because the code can't commit offset 0
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
await ingester.handleEachBatch(partitionMsgs1, heartbeat)
expect(heartbeat).toBeCalledTimes(5)
})
})
})
describe('when ingester.stop is not called in teardown', () => {
describe('stop()', () => {
const setup = async (): Promise<void> => {
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
await ingester.handleEachBatch(partitionMsgs1, noop)
}
it('shuts down without error', async () => {
await setup()
await expect(ingester.stop()).resolves.toMatchObject([
// destroy sessions,
{ status: 'fulfilled' },
])
})
})
})
})

View File

@ -1,7 +1,6 @@
import { Settings } from 'luxon'
import { Message, MessageHeader } from 'node-rdkafka'
import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types'
import {
allSettledWithConcurrency,
getLagMultiplier,
@ -9,7 +8,6 @@ import {
minDefined,
parseKafkaBatch,
parseKafkaMessage,
reduceRecordingMessages,
} from '../../../../src/main/ingestion-queues/session-recording/utils'
import { KafkaProducerWrapper } from '../../../../src/utils/db/kafka-producer-wrapper'
import { UUIDT } from '../../../../src/utils/utils'
@ -716,67 +714,6 @@ describe('session-recording utils', () => {
})
})
describe('reduceMessages', () => {
const messages: IncomingRecordingMessage[] = [
// Should merge
{
distinct_id: '1',
eventsRange: { start: 1, end: 1 },
eventsByWindowId: { window_1: [{ timestamp: 1, type: 1, data: {} }] },
metadata: { lowOffset: 1, highOffset: 1, partition: 1, timestamp: 1, topic: 'the_topic', rawSize: 5 },
session_id: '1',
team_id: 1,
snapshot_source: null,
},
{
distinct_id: '1',
eventsRange: { start: 2, end: 2 },
eventsByWindowId: { window_1: [{ timestamp: 2, type: 2, data: {} }] },
metadata: { lowOffset: 2, highOffset: 2, partition: 1, timestamp: 2, topic: 'the_topic', rawSize: 4 },
session_id: '1',
team_id: 1,
snapshot_source: null,
},
// Different window_id but should still merge
{
distinct_id: '1',
eventsRange: { start: 3, end: 3 },
eventsByWindowId: { window_2: [{ timestamp: 3, type: 3, data: {} }] },
metadata: { lowOffset: 3, highOffset: 3, partition: 1, timestamp: 3, topic: 'the_topic', rawSize: 3 },
session_id: '1',
team_id: 1,
snapshot_source: null,
},
// different team
{
distinct_id: '1',
eventsRange: { start: 4, end: 4 },
eventsByWindowId: { window_1: [{ timestamp: 4, type: 4, data: {} }] },
metadata: { lowOffset: 4, highOffset: 4, partition: 1, timestamp: 4, topic: 'the_topic', rawSize: 30 },
session_id: '1',
team_id: 2,
snapshot_source: null,
},
// Different session_id
{
distinct_id: '1',
eventsRange: { start: 5, end: 5 },
eventsByWindowId: { window_1: [{ timestamp: 5, type: 5, data: {} }] },
metadata: { lowOffset: 5, highOffset: 5, partition: 1, timestamp: 5, topic: 'the_topic', rawSize: 31 },
session_id: '2',
team_id: 1,
snapshot_source: null,
},
]
// Call it once already to make sure that it doesn't mutate the input
expect(reduceRecordingMessages(messages)).toHaveLength(3)
const reduced = reduceRecordingMessages(messages)
expect(reduceRecordingMessages(messages)).toMatchSnapshot()
expect(reduced[0].eventsRange).toEqual({ start: 1, end: 3 })
expect(reduced[0].metadata).toMatchObject({ lowOffset: 1, highOffset: 3 })
})
describe('allSettledWithConcurrency', () => {
jest.setTimeout(1000)
it('should resolve promises in parallel with a max consumption', async () => {