0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 00:47:50 +01:00

feat: Replay ingestion with EFS (#20487)

This commit is contained in:
Ben White 2024-02-26 09:43:27 +01:00 committed by GitHub
parent 66933ca632
commit bcbe300c22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1539 additions and 213 deletions

3
.vscode/launch.json vendored
View File

@ -72,7 +72,8 @@
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
"SKIP_SERVICE_VERSION_REQUIREMENTS": "1",
"PRINT_SQL": "1",
"BILLING_SERVICE_URL": "https://billing.dev.posthog.dev"
"BILLING_SERVICE_URL": "https://billing.dev.posthog.dev",
"RECORDINGS_INGESTER_URL": "http://localhost:6738"
},
"console": "integratedTerminal",
"python": "${workspaceFolder}/env/bin/python",

View File

@ -7,6 +7,7 @@ trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
export DEBUG=${DEBUG:-1}
export SKIP_SERVICE_VERSION_REQUIREMENTS=1
export BILLING_SERVICE_URL=${BILLING_SERVICE_URL:-https://billing.dev.posthog.dev}
export RECORDINGS_INGESTER_URL=${RECORDINGS_INGESTER_URL:-http://localhost:6738}
service_warning() {
echo -e "\033[0;31m$1 isn't ready. You can run the stack with:\ndocker compose -f docker-compose.dev.yml up\nIf you have already ran that, just make sure that services are starting properly, and sit back.\nWaiting for $1 to start...\033[0m"

View File

@ -74,6 +74,8 @@ services:
SENTRY_DSN: $SENTRY_DSN
SITE_URL: https://$DOMAIN
SECRET_KEY: $POSTHOG_SECRET
RECORDINGS_INGESTER_URL: http://plugins:6738
plugins:
extends:
file: docker-compose.base.yml

View File

@ -8,4 +8,5 @@ module.exports = {
setupFilesAfterEnv: ['./jest.setup.fetch-mock.js'],
testMatch: ['<rootDir>/tests/**/*.test.ts'],
testTimeout: 60000,
modulePathIgnorePatterns: ['<rootDir>/.tmp/'],
}

View File

@ -1,5 +1,5 @@
import { PluginServerCapabilities, PluginServerMode, PluginsServerConfig, stringToPluginServerMode } from './types'
import { isTestEnv } from './utils/env-utils'
import { isDevEnv, isTestEnv } from './utils/env-utils'
export function getPluginServerCapabilities(config: PluginsServerConfig): PluginServerCapabilities {
const mode: PluginServerMode | null = config.PLUGIN_SERVER_MODE
@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingV3Ingestion: isDevEnv(),
personOverrides: true,
appManagementSingleton: true,
preflightSchedules: true,
@ -55,6 +56,12 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
sessionRecordingBlobIngestion: true,
...sharedCapabilities,
}
case PluginServerMode.recordings_ingestion_v3:
return {
sessionRecordingV3Ingestion: true,
...sharedCapabilities,
}
case PluginServerMode.async_onevent:
return {
processAsyncOnEventHandlers: true,

View File

@ -1,14 +1,7 @@
import { Properties } from '@posthog/plugin-scaffold'
import { captureException } from '@sentry/node'
import { DateTime } from 'luxon'
import {
ClickHouseTimestamp,
PerformanceEventReverseMapping,
RawPerformanceEvent,
RRWebEvent,
TimestampFormat,
} from '../../../types'
import { ClickHouseTimestamp, RRWebEvent, TimestampFormat } from '../../../types'
import { status } from '../../../utils/status'
import { castTimestampOrNow } from '../../../utils/utils'
import { activeMilliseconds } from './snapshot-segmenter'
@ -271,23 +264,3 @@ export const createSessionReplayEvent = (
return data
}
export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) {
const data: Partial<RawPerformanceEvent> = {
uuid,
team_id: team_id,
distinct_id: distinct_id,
session_id: properties['$session_id'],
window_id: properties['$window_id'],
pageview_id: properties['$pageview_id'],
current_url: properties['$current_url'],
}
Object.entries(PerformanceEventReverseMapping).forEach(([key, value]) => {
if (key in properties) {
data[value] = properties[key]
}
})
return data
}

View File

@ -0,0 +1,477 @@
import { Upload } from '@aws-sdk/lib-storage'
import { captureException, captureMessage } from '@sentry/node'
import { createReadStream, createWriteStream, WriteStream } from 'fs'
import { mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises'
import path from 'path'
import { Counter, Histogram } from 'prom-client'
import { PassThrough } from 'stream'
import { pipeline } from 'stream/promises'
import * as zlib from 'zlib'
import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
import { asyncTimeoutGuard } from '../../../../utils/timing'
import { ObjectStorage } from '../../../services/object_storage'
import { IncomingRecordingMessage } from '../types'
import { convertToPersistedMessage, maxDefined, minDefined, now } from '../utils'
const BUCKETS_LINES_WRITTEN = [0, 10, 50, 100, 500, 1000, 2000, 5000, 10000, Infinity]
export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity]
const S3_UPLOAD_WARN_TIME_SECONDS = 2 * 60 * 1000
// NOTE: To remove once released
const metricPrefix = 'v3_'
export const FILE_EXTENSION = '.jsonl'
export const BUFFER_FILE_NAME = `buffer${FILE_EXTENSION}`
export const FLUSH_FILE_EXTENSION = `.flush${FILE_EXTENSION}`
const counterS3FilesWritten = new Counter({
name: metricPrefix + 'recording_s3_files_written',
help: 'A single file flushed to S3',
labelNames: ['flushReason'],
})
const counterS3WriteErrored = new Counter({
name: metricPrefix + 'recording_s3_write_errored',
help: 'Indicates that we failed to flush to S3 without recovering',
})
const histogramS3LinesWritten = new Histogram({
name: metricPrefix + 'recording_s3_lines_written_histogram',
help: 'The number of lines in a file we send to s3',
buckets: BUCKETS_LINES_WRITTEN,
})
const histogramS3KbWritten = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_s3_kb_written',
help: 'The uncompressed size of file we send to S3',
buckets: BUCKETS_KB_WRITTEN,
})
const histogramSessionAgeSeconds = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_age_seconds',
help: 'The age of current sessions in seconds',
buckets: [0, 60, 60 * 2, 60 * 5, 60 * 8, 60 * 10, 60 * 12, 60 * 15, 60 * 20, Infinity],
})
const histogramSessionSizeKb = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_size_kb',
help: 'The size of current sessions in kb',
buckets: BUCKETS_KB_WRITTEN,
})
const histogramFlushTimeSeconds = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_flush_time_seconds',
help: 'The time taken to flush a session in seconds',
buckets: [0, 2, 5, 10, 20, 30, 60, 120, 180, 300, Infinity],
})
const histogramSessionSize = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_session_lines',
help: 'The size of sessions in numbers of lines',
buckets: BUCKETS_LINES_WRITTEN,
})
const writeStreamBlocked = new Counter({
name: metricPrefix + 'recording_blob_ingestion_write_stream_blocked',
help: 'Number of times we get blocked by the stream backpressure',
})
export type SessionManagerBufferContext = {
sizeEstimate: number
count: number
eventsRange: {
firstTimestamp: number
lastTimestamp: number
} | null
createdAt: number
}
export type SessionBuffer = {
context: SessionManagerBufferContext
fileStream: WriteStream
}
// Context that is updated and persisted to disk so must be serializable
export type SessionManagerContext = {
dir: string
sessionId: string
teamId: number
partition: number
}
export class SessionManagerV3 {
buffer?: SessionBuffer
flushPromise?: Promise<void>
destroying = false
inProgressUpload: Upload | null = null
flushJitterMultiplier: number
constructor(
public readonly serverConfig: PluginsServerConfig,
public readonly s3Client: ObjectStorage['s3'],
public readonly context: SessionManagerContext
) {
// We add a jitter multiplier to the buffer age so that we don't have all sessions flush at the same time
this.flushJitterMultiplier = 1 - Math.random() * serverConfig.SESSION_RECORDING_BUFFER_AGE_JITTER
}
private file(name: string): string {
return path.join(this.context.dir, name)
}
public static async create(
serverConfig: PluginsServerConfig,
s3Client: ObjectStorage['s3'],
context: SessionManagerContext
): Promise<SessionManagerV3> {
const manager = new SessionManagerV3(serverConfig, s3Client, context)
await mkdir(context.dir, { recursive: true })
try {
const fileExists = await stat(manager.file('metadata.json')).then(
() => true,
() => false
)
if (fileExists) {
const bufferMetadata: SessionManagerBufferContext = JSON.parse(
await readFile(manager.file('metadata.json'), 'utf-8')
)
manager.buffer = {
context: bufferMetadata,
fileStream: manager.createFileStreamFor(path.join(context.dir, BUFFER_FILE_NAME)),
}
}
} catch (error) {
// Indicates no buffer metadata file or it's corrupted
status.error('🧨', '[session-manager] failed to read buffer metadata', {
...context,
error,
})
}
status.info('📦', '[session-manager] started new manager', {
...manager.context,
...(manager.buffer?.context ?? {}),
})
return manager
}
private async syncMetadata(): Promise<void> {
if (this.buffer) {
await writeFile(this.file('metadata.json'), JSON.stringify(this.buffer?.context), 'utf-8')
} else {
await unlink(this.file('metadata.json'))
}
}
private async getFlushFiles(): Promise<string[]> {
return (await readdir(this.context.dir)).filter((file) => file.endsWith(FLUSH_FILE_EXTENSION))
}
private captureException(error: Error, extra: Record<string, any> = {}): void {
captureException(error, {
extra: { ...this.context, ...extra },
tags: { teamId: this.context.teamId, sessionId: this.context.sessionId },
})
}
private captureMessage(message: string, extra: Record<string, any> = {}): void {
const context = this.context
captureMessage(message, {
extra: { ...context, ...extra },
tags: { teamId: context.teamId, sessionId: context.sessionId },
})
}
public async add(message: IncomingRecordingMessage): Promise<void> {
if (this.destroying) {
return
}
try {
const buffer = this.getOrCreateBuffer()
const messageData = convertToPersistedMessage(message)
const start = message.events.at(0)?.timestamp
const end = message.events.at(-1)?.timestamp ?? start
if (!start || !end) {
captureMessage("[session-manager]: can't set events range from message without events summary", {
extra: { message },
})
return
}
buffer.context.eventsRange = {
firstTimestamp: minDefined(start, buffer.context.eventsRange?.firstTimestamp) ?? start,
lastTimestamp: maxDefined(end, buffer.context.eventsRange?.lastTimestamp) ?? end,
}
const content = JSON.stringify(messageData) + '\n'
buffer.context.count += 1
buffer.context.sizeEstimate += content.length
if (!buffer.fileStream.write(content, 'utf-8')) {
writeStreamBlocked.inc()
await new Promise((r) => buffer.fileStream.once('drain', r))
}
await this.syncMetadata()
} catch (error) {
this.captureException(error, { message })
throw error
}
}
public async isEmpty(): Promise<boolean> {
return !this.buffer?.context.count && !(await this.getFlushFiles()).length
}
public async flush(force = false): Promise<void> {
if (this.destroying) {
return
}
if (!force) {
await this.maybeFlushCurrentBuffer()
} else {
// This is mostly used by tests
await this.markCurrentBufferForFlush('rebalance')
}
await this.flushFiles()
}
private async maybeFlushCurrentBuffer(): Promise<void> {
if (!this.buffer) {
return
}
if (this.buffer.context.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) {
return this.markCurrentBufferForFlush('buffer_size')
}
const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000
const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier
const logContext: Record<string, any> = {
...this.context,
flushThresholdMs,
flushThresholdJitteredMs,
}
if (!this.buffer.context.count) {
status.warn('🚽', `[session-manager] buffer has no items yet`, { logContext })
return
}
const bufferAgeInMemoryMs = now() - this.buffer.context.createdAt
// check the in-memory age against a larger value than the flush threshold,
// otherwise we'll flap between reasons for flushing when close to real-time processing
const isSessionAgeOverThreshold = bufferAgeInMemoryMs >= flushThresholdJitteredMs
logContext['bufferAgeInMemoryMs'] = bufferAgeInMemoryMs
logContext['isSessionAgeOverThreshold'] = isSessionAgeOverThreshold
histogramSessionAgeSeconds.observe(bufferAgeInMemoryMs / 1000)
histogramSessionSize.observe(this.buffer.context.count)
histogramSessionSizeKb.observe(this.buffer.context.sizeEstimate / 1024)
if (isSessionAgeOverThreshold) {
return this.markCurrentBufferForFlush('buffer_age')
}
}
private async markCurrentBufferForFlush(reason: 'buffer_size' | 'buffer_age' | 'rebalance'): Promise<void> {
const buffer = this.buffer
if (!buffer) {
// TODO: maybe error properly here?
return
}
if (!buffer.context.eventsRange || !buffer.context.count) {
// Indicates some issue with the buffer so we can close out
this.buffer = undefined
return
}
// ADD FLUSH METRICS HERE
const { firstTimestamp, lastTimestamp } = buffer.context.eventsRange
const fileName = `${firstTimestamp}-${lastTimestamp}${FLUSH_FILE_EXTENSION}`
counterS3FilesWritten.labels(reason).inc(1)
histogramS3LinesWritten.observe(buffer.context.count)
histogramS3KbWritten.observe(buffer.context.sizeEstimate / 1024)
// NOTE: We simplify everything by keeping the files as the same name for S3
await new Promise((resolve) => buffer.fileStream.end(resolve))
await rename(this.file(BUFFER_FILE_NAME), this.file(fileName))
this.buffer = undefined
await this.syncMetadata()
}
private async flushFiles(): Promise<void> {
// We read all files marked for flushing and write them to S3
const filesToFlush = await this.getFlushFiles()
await Promise.all(filesToFlush.map((file) => this.flushFile(file)))
}
private async flushFile(filename: string): Promise<void> {
status.info('🚽', '[session-manager] flushing file to S3', {
filename,
...this.context,
})
if (this.destroying) {
status.warn('🚽', '[session-manager] flush called but we are in a destroying state', {
...this.context,
})
return
}
const file = this.file(filename)
const deleteFile = async () => {
await stat(file)
await unlink(file)
}
const endFlushTimer = histogramFlushTimeSeconds.startTimer()
try {
const targetFileName = filename.replace(FLUSH_FILE_EXTENSION, FILE_EXTENSION)
const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.context.teamId}/session_id/${this.context.sessionId}`
const dataKey = `${baseKey}/data/${targetFileName}`
const readStream = createReadStream(file)
const uploadStream = new PassThrough()
// The compressed file
pipeline(readStream, zlib.createGzip(), uploadStream).catch((error) => {
// TODO: If this actually happens we probably want to destroy the buffer as we will be stuck...
status.error('🧨', '[session-manager] writestream errored', {
...this.context,
error,
})
this.captureException(error)
})
readStream.on('error', (err) => {
// TODO: What should we do here?
status.error('🧨', '[session-manager] readstream errored', {
...this.context,
error: err,
})
this.captureException(err)
})
const inProgressUpload = (this.inProgressUpload = new Upload({
client: this.s3Client,
params: {
Bucket: this.serverConfig.OBJECT_STORAGE_BUCKET,
Key: dataKey,
ContentEncoding: 'gzip',
ContentType: 'application/json',
Body: uploadStream,
},
}))
await asyncTimeoutGuard(
{
message: 'session-manager.flush uploading file to S3 delayed.',
timeout: S3_UPLOAD_WARN_TIME_SECONDS,
},
async () => {
await inProgressUpload.done()
}
)
} catch (error: any) {
// TRICKY: error can for some reason sometimes be undefined...
error = error || new Error('Unknown Error')
if (error.name === 'AbortError' && this.destroying) {
// abort of inProgressUpload while destroying is expected
return
}
await this.inProgressUpload?.abort()
// TODO: If we fail to write to S3 we should be do something about it
status.error('🧨', '[session-manager] failed writing session recording blob to S3', {
errorMessage: `${error.name || 'Unknown Error Type'}: ${error.message}`,
error,
...this.context,
})
this.captureException(error)
counterS3WriteErrored.inc()
throw error
} finally {
endFlushTimer()
await deleteFile()
}
}
private getOrCreateBuffer(): SessionBuffer {
if (!this.buffer) {
try {
const context: SessionManagerBufferContext = {
sizeEstimate: 0,
count: 0,
eventsRange: null,
createdAt: now(),
}
const buffer: SessionBuffer = {
context,
fileStream: this.createFileStreamFor(this.file(BUFFER_FILE_NAME)),
}
this.buffer = buffer
} catch (error) {
this.captureException(error)
throw error
}
}
return this.buffer as SessionBuffer
}
protected createFileStreamFor(file: string): WriteStream {
return createWriteStream(file, {
// Opens in append mode in case it already exists
flags: 'a',
encoding: 'utf-8',
})
}
public async stop(): Promise<void> {
this.destroying = true
if (this.inProgressUpload !== null) {
await this.inProgressUpload.abort().catch((error) => {
status.error('🧨', '[session-manager][realtime] failed to abort in progress upload', {
...this.context,
error,
})
this.captureException(error)
})
this.inProgressUpload = null
}
const buffer = this.buffer
if (buffer) {
await new Promise((resolve) => buffer.fileStream.end(resolve))
}
if (await this.isEmpty()) {
status.info('🧨', '[session-manager] removing empty session directory', {
...this.context,
})
await rmdir(this.context.dir, { recursive: true })
}
}
}

View File

@ -0,0 +1,458 @@
import { captureException } from '@sentry/node'
import { createReadStream } from 'fs'
import { readdir, stat } from 'fs/promises'
import { features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import path from 'path'
import { Counter, Gauge, Histogram } from 'prom-client'
import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { PluginsServerConfig, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
import { expressApp } from '../../services/http-server'
import { ObjectStorage } from '../../services/object_storage'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { BUCKETS_KB_WRITTEN, BUFFER_FILE_NAME, SessionManagerV3 } from './services/session-manager-v3'
import { IncomingRecordingMessage } from './types'
import { parseKafkaMessage } from './utils'
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
// WARNING: Do not change this - it will essentially reset the consumer
const KAFKA_CONSUMER_GROUP_ID = 'session-replay-ingester'
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000
// NOTE: To remove once released
const metricPrefix = 'v3_'
const gaugeSessionsHandled = new Gauge({
name: metricPrefix + 'recording_blob_ingestion_session_manager_count',
help: 'A gauge of the number of sessions being handled by this blob ingestion consumer',
})
const histogramKafkaBatchSize = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size',
help: 'The size of the batches we are receiving from Kafka',
buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 600, Infinity],
})
const histogramKafkaBatchSizeKb = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_kafka_batch_size_kb',
help: 'The size in kb of the batches we are receiving from Kafka',
buckets: BUCKETS_KB_WRITTEN,
})
const counterKafkaMessageReceived = new Counter({
name: metricPrefix + 'recording_blob_ingestion_kafka_message_received',
help: 'The number of messages we have received from Kafka',
labelNames: ['partition'],
})
export interface TeamIDWithConfig {
teamId: TeamId | null
consoleLogIngestionEnabled: boolean
}
/**
* The SessionRecordingIngesterV3
* relies on EFS network storage to avoid the need to delay kafka commits and instead uses the disk
* as the persistent volume for both blob data and the metadata around ingestion.
*/
export class SessionRecordingIngesterV3 {
// redisPool: RedisPool
sessions: Record<string, SessionManagerV3> = {}
// sessionHighWaterMarker: OffsetHighWaterMarker
// persistentHighWaterMarker: OffsetHighWaterMarker
// realtimeManager: RealtimeManager
// replayEventsIngester: ReplayEventsIngester
// consoleLogsIngester: ConsoleLogsIngester
batchConsumer?: BatchConsumer
// partitionMetrics: Record<number, PartitionMetrics> = {}
teamsRefresher: BackgroundRefresher<Record<string, TeamIDWithConfig>>
// latestOffsetsRefresher: BackgroundRefresher<Record<number, number | undefined>>
config: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
// totalNumPartitions = 0
private promises: Set<Promise<any>> = new Set()
// if ingestion is lagging on a single partition it is often hard to identify _why_,
// this allows us to output more information for that partition
private debugPartition: number | undefined = undefined
constructor(
globalServerConfig: PluginsServerConfig,
private postgres: PostgresRouter,
private objectStorage: ObjectStorage
) {
this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION
? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION)
: undefined
// NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis
// We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
this.config = sessionRecordingConsumerConfig(globalServerConfig)
// this.redisPool = createRedisPool(this.config)
// NOTE: This is the only place where we need to use the shared server config
// TODO: Uncomment when we swap to using this service as the ingester for it
// this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.persistentHighWaterMarker)
// this.consoleLogsIngester = new ConsoleLogsIngester(globalServerConfig, this.persistentHighWaterMarker)
this.teamsRefresher = new BackgroundRefresher(async () => {
try {
status.info('🔁', 'session-replay-ingestion - refreshing teams in the background')
return await fetchTeamTokensWithRecordings(this.postgres)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to refresh teams in the background', e)
captureException(e)
throw e
}
})
}
private get rootDir() {
return path.join(this.config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-recordings')
}
private dirForSession(partition: number, teamId: number, sessionId: string): string {
return path.join(this.rootDir, `${partition}`, `${teamId}__${sessionId}`)
}
private get connectedBatchConsumer(): KafkaConsumer | undefined {
// Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors
const consumer = this.batchConsumer?.consumer
return consumer && consumer.isConnected() ? consumer : undefined
}
private get assignedTopicPartitions(): TopicPartition[] {
return this.connectedBatchConsumer?.assignments() ?? []
}
private scheduleWork<T>(promise: Promise<T>): Promise<T> {
/**
* Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished.
* That way when shutting down we can wait for all promises to finish before exiting.
*/
this.promises.add(promise)
promise.finally(() => this.promises.delete(promise))
return promise
}
public async consume(event: IncomingRecordingMessage): Promise<void> {
const { team_id, session_id } = event
const key = `${team_id}__${session_id}`
const { offset, partition } = event.metadata
if (this.debugPartition === partition) {
status.info('🔁', '[session-replay-ingestion] - [PARTITION DEBUG] - consuming event', {
team_id,
session_id,
partition,
offset,
})
}
if (!this.sessions[key]) {
const { partition } = event.metadata
this.sessions[key] = await SessionManagerV3.create(this.config, this.objectStorage.s3, {
teamId: team_id,
sessionId: session_id,
dir: this.dirForSession(partition, team_id, session_id),
partition,
})
}
await this.sessions[key]?.add(event)
}
public async handleEachBatch(messages: Message[]): Promise<void> {
status.info('🔁', `session-replay-ingestion - handling batch`, {
size: messages.length,
partitionsInBatch: [...new Set(messages.map((x) => x.partition))],
assignedPartitions: this.assignedTopicPartitions.map((x) => x.partition),
sessionsHandled: Object.keys(this.sessions).length,
})
// TODO: For all assigned partitions, load up any sessions on disk that we don't already have in memory
// TODO: Add a timer or something to fire this "handleEachBatch" with an empty batch for quite partitions
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch`,
logExecutionTime: true,
func: async () => {
histogramKafkaBatchSize.observe(messages.length)
histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024)
const recordingMessages: IncomingRecordingMessage[] = []
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`,
func: async () => {
for (const message of messages) {
counterKafkaMessageReceived.inc({ partition: message.partition })
const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
)
if (recordingMessage) {
recordingMessages.push(recordingMessage)
}
}
},
})
// await this.reportPartitionMetrics()
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.ensureSessionsAreLoaded`,
func: async () => {
await this.syncSessionsWithDisk()
},
})
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeBatch`,
func: async () => {
for (const message of recordingMessages) {
await this.consume(message)
}
},
})
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`,
func: async () => {
await this.flushAllReadySessions()
},
})
// await runInstrumentedFunction({
// statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`,
// func: async () => {
// await this.replayEventsIngester.consumeBatch(recordingMessages)
// },
// })
// await runInstrumentedFunction({
// statsKey: `recordingingester.handleEachBatch.consumeConsoleLogEvents`,
// func: async () => {
// await this.consoleLogsIngester.consumeBatch(recordingMessages)
// },
// })
},
})
}
public async start(): Promise<void> {
status.info('🔁', 'session-replay-ingestion - starting session recordings blob consumer', {
librdKafkaVersion: librdkafkaVersion,
kafkaCapabilities: features,
})
this.setupHttpRoutes()
// Load teams into memory
await this.teamsRefresher.refresh()
// await this.replayEventsIngester.start()
// await this.consoleLogsIngester.start()
const connectionConfig = createRdConnectionConfigFromEnvVars(this.config)
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
this.batchConsumer = await startBatchConsumer({
connectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: true, // NOTE: This is the crucial difference between this and the other consumer
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
// our messages are very big, so we don't want to buffer too many
queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
eachBatch: async (messages) => {
return await this.scheduleWork(this.handleEachBatch(messages))
},
})
addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)
this.batchConsumer.consumer.on('disconnected', async (err) => {
// since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect
// we need to listen to disconnect and make sure we're stopped
status.info('🔁', 'session-replay-ingestion batch consumer disconnected, cleaning up', { err })
await this.stop()
})
}
public async stop(): Promise<PromiseSettledResult<any>[]> {
status.info('🔁', 'session-replay-ingestion - stopping')
// NOTE: We have to get the partitions before we stop the consumer as it throws if disconnected
// const assignedPartitions = this.assignedTopicPartitions
// Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive
await this.batchConsumer?.stop()
void this.scheduleWork(
Promise.allSettled(
Object.entries(this.sessions).map(([key, sessionManager]) => this.destroySession(key, sessionManager))
)
)
// void this.scheduleWork(this.replayEventsIngester.stop())
// void this.scheduleWork(this.consoleLogsIngester.stop())
const promiseResults = await Promise.allSettled(this.promises)
// Finally we clear up redis once we are sure everything else has been handled
// await this.redisPool.drain()
// await this.redisPool.clear()
status.info('👍', 'session-replay-ingestion - stopped!')
return promiseResults
}
public isHealthy() {
// TODO: Maybe extend this to check if we are shutting down so we don't get killed early.
return this.batchConsumer?.isHealthy()
}
async flushAllReadySessions(): Promise<void> {
const promises: Promise<void>[] = []
const assignedPartitions = this.assignedTopicPartitions.map((x) => x.partition)
for (const [key, sessionManager] of Object.entries(this.sessions)) {
if (!assignedPartitions.includes(sessionManager.context.partition)) {
promises.push(this.destroySession(key, sessionManager))
continue
}
const flushPromise = sessionManager
.flush()
.catch((err) => {
status.error(
'🚽',
'session-replay-ingestion - failed trying to flush on idle session: ' +
sessionManager.context.sessionId,
{
err,
session_id: sessionManager.context.sessionId,
}
)
captureException(err, { tags: { session_id: sessionManager.context.sessionId } })
})
.then(async () => {
// If the SessionManager is done (flushed and with no more queued events) then we remove it to free up memory
if (await sessionManager.isEmpty()) {
await this.destroySession(key, sessionManager)
}
})
promises.push(flushPromise)
}
await Promise.allSettled(promises)
gaugeSessionsHandled.set(Object.keys(this.sessions).length)
}
private async syncSessionsWithDisk(): Promise<void> {
// As we may get assigned and reassigned partitions, we want to make sure that we have all sessions loaded into memory
await Promise.all(
this.assignedTopicPartitions.map(async ({ partition }) => {
const keys = await readdir(path.join(this.rootDir, `${partition}`)).catch(() => {
// This happens if there are no files on disk for that partition yet
return []
})
// TODO: Below regex is a little crude. We should fix it
await Promise.all(
keys
.filter((x) => /\d+__[a-zA-Z0-9\-]+/.test(x))
.map(async (key) => {
// TODO: Ensure sessionId can only be a uuid
const [teamId, sessionId] = key.split('__')
if (!this.sessions[key]) {
this.sessions[key] = await SessionManagerV3.create(this.config, this.objectStorage.s3, {
teamId: parseInt(teamId),
sessionId,
dir: this.dirForSession(partition, parseInt(teamId), sessionId),
partition,
})
}
})
)
})
)
}
private async destroySession(key: string, sessionManager: SessionManagerV3): Promise<void> {
delete this.sessions[key]
await sessionManager.stop()
}
private setupHttpRoutes() {
// Mimic the app sever's endpoint
expressApp.get('/api/projects/:projectId/session_recordings/:sessionId/snapshots', async (req, res) => {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})
// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}
const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}
status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })
// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])
for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)
if (!exists) {
continue
}
const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
}
res.sendStatus(404)
})
}
}

View File

@ -39,6 +39,7 @@ import {
} from './ingestion-queues/on-event-handler-consumer'
import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer'
import { SessionRecordingIngester } from './ingestion-queues/session-recording/session-recordings-consumer'
import { SessionRecordingIngesterV3 } from './ingestion-queues/session-recording/session-recordings-consumer-v3'
import { setupCommonRoutes } from './services/http-server'
import { getObjectStorage } from './services/object_storage'
@ -451,6 +452,27 @@ export async function startPluginsServer(
}
}
if (capabilities.sessionRecordingV3Ingestion) {
const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig)
if (!s3) {
throw new Error("Can't start session recording ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngesterV3(serverConfig, postgres, s3)
await ingester.start()
const batchConsumer = ingester.batchConsumer
if (batchConsumer) {
stopSessionRecordingBlobConsumer = () => ingester.stop()
shutdownOnConsumerExit(batchConsumer)
healthChecks['session-recordings-ingestion'] = () => ingester.isHealthy() ?? false
}
}
if (capabilities.personOverrides) {
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))

View File

@ -77,6 +77,7 @@ export enum PluginServerMode {
scheduler = 'scheduler',
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
recordings_ingestion_v3 = 'recordings-ingestion-v3',
person_overrides = 'person-overrides',
}
@ -298,6 +299,7 @@ export interface PluginServerCapabilities {
processAsyncOnEventHandlers?: boolean
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestion?: boolean
sessionRecordingV3Ingestion?: boolean
personOverrides?: boolean
appManagementSingleton?: boolean
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
@ -950,117 +952,6 @@ export interface RawSessionReplayEvent {
/* TODO what columns do we need */
}
export interface RawPerformanceEvent {
uuid: string
team_id: number
distinct_id: string
session_id: string
window_id: string
pageview_id: string
current_url: string
// BASE_EVENT_COLUMNS
time_origin: number
timestamp: string
entry_type: string
name: string
// RESOURCE_EVENT_COLUMNS
start_time: number
redirect_start: number
redirect_end: number
worker_start: number
fetch_start: number
domain_lookup_start: number
domain_lookup_end: number
connect_start: number
secure_connection_start: number
connect_end: number
request_start: number
response_start: number
response_end: number
decoded_body_size: number
encoded_body_size: number
duration: number
initiator_type: string
next_hop_protocol: string
render_blocking_status: string
response_status: number
transfer_size: number
// LARGEST_CONTENTFUL_PAINT_EVENT_COLUMNS
largest_contentful_paint_element: string
largest_contentful_paint_render_time: number
largest_contentful_paint_load_time: number
largest_contentful_paint_size: number
largest_contentful_paint_id: string
largest_contentful_paint_url: string
// NAVIGATION_EVENT_COLUMNS
dom_complete: number
dom_content_loaded_event: number
dom_interactive: number
load_event_end: number
load_event_start: number
redirect_count: number
navigation_type: string
unload_event_end: number
unload_event_start: number
}
export const PerformanceEventReverseMapping: { [key: number]: keyof RawPerformanceEvent } = {
// BASE_PERFORMANCE_EVENT_COLUMNS
0: 'entry_type',
1: 'time_origin',
2: 'name',
// RESOURCE_EVENT_COLUMNS
3: 'start_time',
4: 'redirect_start',
5: 'redirect_end',
6: 'worker_start',
7: 'fetch_start',
8: 'domain_lookup_start',
9: 'domain_lookup_end',
10: 'connect_start',
11: 'secure_connection_start',
12: 'connect_end',
13: 'request_start',
14: 'response_start',
15: 'response_end',
16: 'decoded_body_size',
17: 'encoded_body_size',
18: 'initiator_type',
19: 'next_hop_protocol',
20: 'render_blocking_status',
21: 'response_status',
22: 'transfer_size',
// LARGEST_CONTENTFUL_PAINT_EVENT_COLUMNS
23: 'largest_contentful_paint_element',
24: 'largest_contentful_paint_render_time',
25: 'largest_contentful_paint_load_time',
26: 'largest_contentful_paint_size',
27: 'largest_contentful_paint_id',
28: 'largest_contentful_paint_url',
// NAVIGATION_EVENT_COLUMNS
29: 'dom_complete',
30: 'dom_content_loaded_event',
31: 'dom_interactive',
32: 'load_event_end',
33: 'load_event_start',
34: 'redirect_count',
35: 'navigation_type',
36: 'unload_event_end',
37: 'unload_event_start',
// Added after v1
39: 'duration',
40: 'timestamp',
}
export enum TimestampFormat {
ClickHouseSecondPrecision = 'clickhouse-second-precision',
ClickHouse = 'clickhouse',

View File

@ -2,7 +2,6 @@ import { DateTime } from 'luxon'
import {
ConsoleLogEntry,
createPerformanceEvent,
createSessionReplayEvent,
gatherConsoleLogEvents,
getTimestampsFrom,
@ -414,73 +413,4 @@ describe('session recording process event', () => {
} satisfies ConsoleLogEntry,
])
})
it('performance event stored as performance_event', () => {
const data = createPerformanceEvent('some-id', 12345, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', {
// Taken from a real event from the JS
'0': 'resource',
'1': 1671723295836,
'2': 'http://localhost:8000/api/projects/1/session_recordings',
'3': 10737.89999999106,
'4': 0,
'5': 0,
'6': 0,
'7': 10737.89999999106,
'8': 10737.89999999106,
'9': 10737.89999999106,
'10': 10737.89999999106,
'11': 0,
'12': 10737.89999999106,
'13': 10745.09999999404,
'14': 11121.70000000298,
'15': 11122.20000000298,
'16': 73374,
'17': 1767,
'18': 'fetch',
'19': 'http/1.1',
'20': 'non-blocking',
'22': 2067,
'39': 384.30000001192093,
'40': 1671723306573,
token: 'phc_234',
$session_id: '1853a793ad26c1-0eea05631cbeff-17525635-384000-1853a793ad31dd2',
$window_id: '1853a793ad424a5-017f7473b057f1-17525635-384000-1853a793ad524dc',
distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4',
$current_url: 'http://localhost:8000/recordings/recent',
})
expect(data).toEqual({
connect_end: 10737.89999999106,
connect_start: 10737.89999999106,
current_url: 'http://localhost:8000/recordings/recent',
decoded_body_size: 73374,
distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4',
domain_lookup_end: 10737.89999999106,
domain_lookup_start: 10737.89999999106,
duration: 384.30000001192093,
encoded_body_size: 1767,
entry_type: 'resource',
fetch_start: 10737.89999999106,
initiator_type: 'fetch',
name: 'http://localhost:8000/api/projects/1/session_recordings',
next_hop_protocol: 'http/1.1',
pageview_id: undefined,
redirect_end: 0,
redirect_start: 0,
render_blocking_status: 'non-blocking',
request_start: 10745.09999999404,
response_end: 11122.20000000298,
response_start: 11121.70000000298,
secure_connection_start: 0,
session_id: '1853a793ad26c1-0eea05631cbeff-17525635-384000-1853a793ad31dd2',
start_time: 10737.89999999106,
team_id: 12345,
time_origin: 1671723295836,
timestamp: 1671723306573,
transfer_size: 2067,
uuid: 'some-id',
window_id: '1853a793ad424a5-017f7473b057f1-17525635-384000-1853a793ad524dc',
worker_start: 0,
})
})
})

View File

@ -0,0 +1,251 @@
import { Upload } from '@aws-sdk/lib-storage'
import { randomUUID } from 'crypto'
import fs from 'fs/promises'
import { DateTime, Settings } from 'luxon'
import path from 'path'
import { PassThrough } from 'stream'
import * as zlib from 'zlib'
import { defaultConfig } from '../../../../../src/config/config'
import { SessionManagerV3 } from '../../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3'
import { now } from '../../../../../src/main/ingestion-queues/session-recording/utils'
import { createIncomingRecordingMessage } from '../fixtures'
jest.mock('@aws-sdk/lib-storage', () => {
const mockUpload = jest.fn().mockImplementation(() => {
return {
abort: jest.fn().mockResolvedValue(undefined),
done: jest.fn().mockResolvedValue(undefined),
}
})
return {
__esModule: true,
Upload: mockUpload,
}
})
const tmpDir = path.join(__dirname, '../../../../../.tmp/test_session_recordings')
describe('session-manager', () => {
jest.setTimeout(1000)
let sessionManager: SessionManagerV3
const mockS3Client: any = {
send: jest.fn(),
}
const createSessionManager = async (
sessionId = randomUUID(),
teamId = 1,
partition = 1
): Promise<SessionManagerV3> => {
return await SessionManagerV3.create(defaultConfig, mockS3Client, {
sessionId,
teamId,
partition,
dir: path.join(tmpDir, `${partition}`, `${teamId}__${sessionId}`),
})
}
const flushThreshold = defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000
beforeEach(async () => {
// it's always May 25
Settings.now = () => new Date(2018, 4, 25).valueOf()
if (await fs.stat(tmpDir).catch(() => null)) {
await fs.rmdir(tmpDir, { recursive: true })
}
sessionManager = await createSessionManager()
})
afterEach(async () => {
await sessionManager?.stop()
// it's no longer always May 25
Settings.now = () => new Date().valueOf()
})
it('adds a message', async () => {
const timestamp = now()
const event = createIncomingRecordingMessage({
events: [
{ timestamp: timestamp, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: timestamp + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
})
await sessionManager.add(event)
expect(sessionManager.buffer?.context).toEqual({
sizeEstimate: 193,
count: 1,
eventsRange: { firstTimestamp: timestamp, lastTimestamp: timestamp + 1000 },
createdAt: timestamp,
})
const stats = await fs.stat(`${sessionManager.context.dir}/buffer.jsonl`)
expect(stats.size).toBeGreaterThan(0)
})
it('does not flush if it has received a message recently', async () => {
const now = DateTime.now()
const event = createIncomingRecordingMessage({
metadata: {
timestamp: now,
} as any,
})
await sessionManager.add(event)
await sessionManager.flush()
expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
})
it('does flush if the stored file is older than the threshold', async () => {
const firstTimestamp = 1700000000000
const lastTimestamp = 1700000000000 + 4000
const eventOne = createIncomingRecordingMessage({
events: [{ timestamp: firstTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }],
})
const eventTwo = createIncomingRecordingMessage({
events: [{ timestamp: lastTimestamp, type: 4, data: { href: 'http://localhost:3001/' } }],
})
await sessionManager.add(eventOne)
await sessionManager.add(eventTwo)
sessionManager.buffer!.context.createdAt = now() - flushThreshold - 1
await sessionManager.flush()
expect(await fs.readdir(sessionManager.context.dir)).toEqual([])
const sessionId = sessionManager.context.sessionId
// as a proxy for flush having been called or not
const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls
expect(mockUploadCalls.length).toBe(1)
expect(mockUploadCalls[0].length).toBe(1)
expect(mockUploadCalls[0][0]).toEqual(
expect.objectContaining({
params: expect.objectContaining({
Key: `session_recordings/team_id/1/session_id/${sessionId}/data/${firstTimestamp}-${lastTimestamp}.jsonl`,
}),
})
)
})
it('has a fixed jitter based on the serverConfig', async () => {
const minJitter = 1 - defaultConfig.SESSION_RECORDING_BUFFER_AGE_JITTER
for (const _ of Array(100).keys()) {
const sm = await createSessionManager()
expect(sm.flushJitterMultiplier).toBeGreaterThanOrEqual(minJitter)
expect(sm.flushJitterMultiplier).toBeLessThanOrEqual(1)
}
})
it('not remove files when stopped', async () => {
expect(await fs.readdir(sessionManager.context.dir)).toEqual([])
await sessionManager.add(createIncomingRecordingMessage())
expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
await sessionManager.stop()
expect(await fs.readdir(sessionManager.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
})
it('removes the directly when stopped after fully flushed', async () => {
const sm = await createSessionManager('session_id_2', 2, 2)
expect(await fs.readdir(sm.context.dir)).toEqual([])
await sm.add(createIncomingRecordingMessage())
expect(await fs.readdir(sm.context.dir)).toEqual(['buffer.jsonl', 'metadata.json'])
await sm.flush(true)
expect(await fs.readdir(sm.context.dir)).toEqual([])
await sm.stop()
// ;(sessionManager as any) = undefined // Stop the afterEach from failing
await expect(fs.stat(sm.context.dir)).rejects.toThrowError('ENOENT: no such file or directory')
})
it('reads successfully with the stream not closed', async () => {
const event = createIncomingRecordingMessage({
events: [
{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
})
await sessionManager.add(event)
const content = await fs.readFile(`${sessionManager.context.dir}/buffer.jsonl`, 'utf-8')
expect(content).toEqual(
'{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n'
)
})
it('adds to the existing buffer when restarted', async () => {
const sm1 = await createSessionManager('session_id_2', 2, 2)
await sm1.add(
createIncomingRecordingMessage({
events: [
{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
})
)
const sm2 = await createSessionManager('session_id_2', 2, 2)
await sm2.add(
createIncomingRecordingMessage({
events: [
{ timestamp: 170000000 + 2000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 3000, type: 4, data: { href: 'http://localhost:3001/' } },
],
})
)
const buffer = await fs.readFile(`${sm1.context.dir}/buffer.jsonl`, 'utf-8')
expect(buffer).toEqual(
'{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170001000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n{"window_id":"window_id_1","data":[{"timestamp":170002000,"type":4,"data":{"href":"http://localhost:3001/"}},{"timestamp":170003000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n'
)
await sm1.stop()
await sm2.stop()
})
it('uploads a gzip compressed file to S3', async () => {
await sessionManager.add(
createIncomingRecordingMessage({
events: [{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } }],
})
)
await sessionManager.flush(true)
const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls
expect(mockUploadCalls[0][0]).toMatchObject({
params: {
Bucket: 'posthog',
Key: `session_recordings/team_id/1/session_id/${sessionManager.context.sessionId}/data/170000000-170000000.jsonl`,
ContentEncoding: 'gzip',
ContentType: 'application/json',
},
})
const uploadBody = mockUploadCalls[0][0].params.Body
expect(uploadBody).toBeInstanceOf(PassThrough)
// Extract the content from the stream and gzip decompress it
const chunks: Uint8Array[] = []
for await (const chunk of uploadBody) {
chunks.push(chunk)
}
const buffer = Buffer.concat(chunks)
const uncompressed = zlib.gunzipSync(buffer).toString('utf-8')
expect(uncompressed).toEqual(
'{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n'
)
})
})

View File

@ -0,0 +1,295 @@
import { randomUUID } from 'crypto'
import fs from 'fs/promises'
import { mkdirSync, rmSync } from 'node:fs'
import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka'
import path from 'path'
import { waitForExpect } from '../../../../functional_tests/expectations'
import { defaultConfig } from '../../../../src/config/config'
import {
SessionManagerBufferContext,
SessionManagerContext,
} from '../../../../src/main/ingestion-queues/session-recording/services/session-manager-v3'
import { SessionRecordingIngesterV3 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v3'
import { Hub, PluginsServerConfig, Team } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql'
import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures'
const SESSION_RECORDING_REDIS_PREFIX = '@posthog-tests/replay/'
const tmpDir = path.join(__dirname, '../../../../.tmp/test_session_recordings')
const config: PluginsServerConfig = {
...defaultConfig,
SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: true,
SESSION_RECORDING_REDIS_PREFIX,
SESSION_RECORDING_LOCAL_DIRECTORY: tmpDir,
}
async function deleteKeysWithPrefix(hub: Hub) {
const redisClient = await hub.redisPool.acquire()
const keys = await redisClient.keys(`${SESSION_RECORDING_REDIS_PREFIX}*`)
const pipeline = redisClient.pipeline()
keys.forEach(function (key) {
pipeline.del(key)
})
await pipeline.exec()
await hub.redisPool.release(redisClient)
}
const mockConsumer = {
on: jest.fn(),
commitSync: jest.fn(),
commit: jest.fn(),
queryWatermarkOffsets: jest.fn(),
committed: jest.fn(),
assignments: jest.fn(),
isConnected: jest.fn(() => true),
getMetadata: jest.fn(),
}
jest.mock('../../../../src/kafka/batch-consumer', () => {
return {
startBatchConsumer: jest.fn(() =>
Promise.resolve({
join: () => ({
finally: jest.fn(),
}),
stop: jest.fn(),
consumer: mockConsumer,
})
),
}
})
jest.setTimeout(1000)
describe('ingester', () => {
let ingester: SessionRecordingIngesterV3
let hub: Hub
let closeHub: () => Promise<void>
let team: Team
let teamToken = ''
let mockOffsets: Record<number, number> = {}
let mockCommittedOffsets: Record<number, number> = {}
beforeAll(async () => {
mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true })
await resetTestDatabase()
})
beforeEach(async () => {
if (await fs.stat(tmpDir).catch(() => null)) {
await fs.rmdir(tmpDir, { recursive: true })
}
// The below mocks simulate committing to kafka and querying the offsets
mockCommittedOffsets = {}
mockOffsets = {}
mockConsumer.commit.mockImplementation(
(tpo: TopicPartitionOffset) => (mockCommittedOffsets[tpo.partition] = tpo.offset)
)
mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => {
cb(null, { highOffset: mockOffsets[partition] ?? 1, lowOffset: 0 })
})
mockConsumer.getMetadata.mockImplementation((options, cb) => {
cb(null, {
topics: [{ name: options.topic, partitions: [{ id: 0 }, { id: 1 }, { id: 2 }] }],
})
})
mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], _timeout, cb) => {
const tpos: TopicPartitionOffset[] = topicPartitions.map((tp) => ({
topic: tp.topic,
partition: tp.partition,
offset: mockCommittedOffsets[tp.partition] ?? 1,
}))
cb(null, tpos)
})
;[hub, closeHub] = await createHub()
team = await getFirstTeam(hub)
teamToken = team.api_token
await deleteKeysWithPrefix(hub)
ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
await ingester.start()
mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)])
})
afterEach(async () => {
jest.setTimeout(10000)
await deleteKeysWithPrefix(hub)
await ingester.stop()
await closeHub()
})
afterAll(() => {
rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true })
jest.useRealTimers()
})
const createMessage = (session_id: string, partition = 1) => {
mockOffsets[partition] = mockOffsets[partition] ?? 0
mockOffsets[partition]++
return createKafkaMessage(
teamToken,
{
partition,
offset: mockOffsets[partition],
},
{
$session_id: session_id,
}
)
}
it('can parse debug partition config', () => {
const config = {
SESSION_RECORDING_DEBUG_PARTITION: '103',
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig
const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
expect(ingester['debugPartition']).toEqual(103)
})
it('can parse absence of debug partition config', () => {
const config = {
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig
const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
expect(ingester['debugPartition']).toBeUndefined()
})
it('creates a new session manager if needed', async () => {
const event = createIncomingRecordingMessage()
await ingester.consume(event)
await waitForExpect(() => {
expect(Object.keys(ingester.sessions).length).toBe(1)
expect(ingester.sessions['1__session_id_1']).toBeDefined()
})
})
it('handles multiple incoming sessions', async () => {
const event = createIncomingRecordingMessage()
const event2 = createIncomingRecordingMessage({
session_id: 'session_id_2',
})
await Promise.all([ingester.consume(event), ingester.consume(event2)])
expect(Object.keys(ingester.sessions).length).toBe(2)
expect(ingester.sessions['1__session_id_1']).toBeDefined()
expect(ingester.sessions['1__session_id_2']).toBeDefined()
})
it('destroys a session manager if finished', async () => {
const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}`
const event = createIncomingRecordingMessage({
session_id: sessionId,
})
await ingester.consume(event)
expect(ingester.sessions[`1__${sessionId}`]).toBeDefined()
ingester.sessions[`1__${sessionId}`].buffer!.context.createdAt = 0
await ingester.flushAllReadySessions()
await waitForExpect(() => {
expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined()
}, 10000)
})
describe('simulated rebalancing', () => {
let otherIngester: SessionRecordingIngesterV3
jest.setTimeout(5000) // Increased to cover lock delay
beforeEach(async () => {
otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
await otherIngester.start()
})
afterEach(async () => {
await otherIngester.stop()
})
const getSessions = (
ingester: SessionRecordingIngesterV3
): (SessionManagerContext & SessionManagerBufferContext)[] =>
Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer!.context }))
/**
* It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs
* Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up
*/
it('rebalances new consumers', async () => {
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)]
mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)])
await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2])
expect(getSessions(ingester)).toMatchObject([
{ sessionId: 'session_id_1', partition: 1, count: 1 },
{ sessionId: 'session_id_2', partition: 1, count: 1 },
{ sessionId: 'session_id_3', partition: 2, count: 1 },
{ sessionId: 'session_id_4', partition: 2, count: 1 },
])
// Call handleEachBatch with both consumers - we simulate the assignments which
// is what is responsible for the actual syncing of the sessions
mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)])
await otherIngester.handleEachBatch([createMessage('session_id_4', 2), createMessage('session_id_5', 2)])
mockConsumer.assignments.mockImplementation(() => [createTP(1)])
await ingester.handleEachBatch([createMessage('session_id_1', 1)])
// Should still have the partition 1 sessions that didnt move with added events
expect(getSessions(ingester)).toMatchObject([
{ sessionId: 'session_id_1', partition: 1, count: 2 },
{ sessionId: 'session_id_2', partition: 1, count: 1 },
])
expect(getSessions(otherIngester)).toMatchObject([
{ sessionId: 'session_id_3', partition: 2, count: 1 },
{ sessionId: 'session_id_4', partition: 2, count: 2 },
{ sessionId: 'session_id_5', partition: 2, count: 1 },
])
})
})
describe('stop()', () => {
const setup = async (): Promise<void> => {
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
await ingester.handleEachBatch(partitionMsgs1)
}
// TODO: Unskip when we add back in the replay and console ingestion
it('shuts down without error', async () => {
await setup()
await expect(ingester.stop()).resolves.toMatchObject([
// destroy sessions,
{ status: 'fulfilled' },
// // stop replay ingester
// { status: 'fulfilled' },
// // stop console ingester
// { status: 'fulfilled' },
])
})
})
describe('when a team is disabled', () => {
it('ignores invalid teams', async () => {
// non-zero offset because the code can't commit offset 0
await ingester.handleEachBatch([
createKafkaMessage('invalid_token', { offset: 12 }),
createKafkaMessage('invalid_token', { offset: 13 }),
])
expect(ingester.sessions).toEqual({})
})
})
})

View File

@ -342,7 +342,10 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet):
for full_key in blob_keys:
# Keys are like 1619712000-1619712060
blob_key = full_key.replace(blob_prefix.rstrip("/") + "/", "")
time_range = [datetime.fromtimestamp(int(x) / 1000, tz=timezone.utc) for x in blob_key.split("-")]
blob_key_base = blob_key.split(".")[0] # Remove the extension if it exists
time_range = [
datetime.fromtimestamp(int(x) / 1000, tz=timezone.utc) for x in blob_key_base.split("-")
]
sources.append(
{
@ -378,7 +381,19 @@ class SessionRecordingViewSet(TeamAndOrgViewSetMixin, viewsets.GenericViewSet):
response_data["sources"] = sources
elif source == "realtime":
snapshots = get_realtime_snapshots(team_id=self.team.pk, session_id=str(recording.session_id)) or []
if request.GET.get("version", None) == "3" and settings.RECORDINGS_INGESTER_URL:
with requests.get(
url=f"{settings.RECORDINGS_INGESTER_URL}/api/projects/{self.team.pk}/session_recordings/{str(recording.session_id)}/snapshots",
stream=True,
) as r:
if r.status_code == 404:
return Response({"snapshots": []})
response = HttpResponse(content=r.raw, content_type="application/json")
response["Content-Disposition"] = "inline"
return response
else:
snapshots = get_realtime_snapshots(team_id=self.team.pk, session_id=str(recording.session_id)) or []
event_properties["source"] = "realtime"
event_properties["snapshots_length"] = len(snapshots)

View File

@ -23,3 +23,5 @@ REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS = get_from_env(
)
REPLAY_EMBEDDINGS_ALLOWED_TEAMS: List[str] = get_list(get_from_env("REPLAY_EMBEDDINGS_ALLOWED_TEAM", "", type_cast=str))
RECORDINGS_INGESTER_URL = get_from_env("RECORDINGS_INGESTER_URL", "")