0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

feat: S3 backed recording ingestion (take 2) (#14864)

This commit is contained in:
Ben White 2023-04-25 11:43:07 +02:00 committed by GitHub
parent db6d8598b5
commit fdb2c71a39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2775 additions and 89 deletions

View File

@ -33,6 +33,8 @@ env:
OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password'
OBJECT_STORAGE_SESSION_RECORDING_FOLDER: 'session_recordings'
OBJECT_STORAGE_BUCKET: 'posthog'
# set the max buffer size small enough that the functional tests behave the same in CI as when running locally
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: 1024
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}

View File

@ -8,3 +8,4 @@ yalc.lock
tmp
*.0x
coverage/
.tmp/

View File

@ -0,0 +1,150 @@
import { GetObjectCommand, GetObjectCommandOutput, ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3'
import fs from 'fs'
import { Consumer, Kafka, KafkaMessage, logLevel } from 'kafkajs'
import * as zlib from 'zlib'
import { defaultConfig } from '../src/config/config'
import { compressToString } from '../src/main/ingestion-queues/session-recording/blob-ingester/utils'
import { getObjectStorage } from '../src/main/services/object_storage'
import { UUIDT } from '../src/utils/utils'
import { capture, createOrganization, createTeam } from './api'
import { waitForExpect } from './expectations'
let kafka: Kafka
let organizationId: string
let dlq: KafkaMessage[]
let dlqConsumer: Consumer
let s3: S3Client
function generateVeryLongString(length = 1025) {
return [...Array(length)].map(() => Math.random().toString(36)[2]).join('')
}
beforeAll(async () => {
kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })
dlq = []
dlqConsumer = kafka.consumer({ groupId: 'session_recording_events_test' })
await dlqConsumer.subscribe({ topic: 'session_recording_events_dlq' })
await dlqConsumer.run({
eachMessage: ({ message }) => {
dlq.push(message)
return Promise.resolve()
},
})
organizationId = await createOrganization()
const objectStorage = getObjectStorage({
OBJECT_STORAGE_ENDPOINT: defaultConfig.OBJECT_STORAGE_ENDPOINT,
OBJECT_STORAGE_REGION: defaultConfig.OBJECT_STORAGE_REGION,
OBJECT_STORAGE_ACCESS_KEY_ID: defaultConfig.OBJECT_STORAGE_ACCESS_KEY_ID,
OBJECT_STORAGE_SECRET_ACCESS_KEY: defaultConfig.OBJECT_STORAGE_SECRET_ACCESS_KEY,
OBJECT_STORAGE_ENABLED: defaultConfig.OBJECT_STORAGE_ENABLED,
OBJECT_STORAGE_BUCKET: defaultConfig.OBJECT_STORAGE_BUCKET,
})
if (!objectStorage) {
throw new Error('S3 not configured')
}
s3 = objectStorage.s3
})
afterAll(async () => {
await Promise.all([await dlqConsumer.disconnect()])
})
test.skip(`single recording event writes data to local tmp file`, async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
const sessionId = new UUIDT().toString()
const veryLongString = generateVeryLongString()
await capture({
teamId,
distinctId,
uuid,
event: '$snapshot',
properties: {
$session_id: sessionId,
$window_id: 'abc1234',
$snapshot_data: { data: compressToString(veryLongString), chunk_count: 1 },
},
})
let tempFiles: string[] = []
await waitForExpect(async () => {
const files = await fs.promises.readdir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY)
tempFiles = files.filter((f) => f.startsWith(`${teamId}.${sessionId}`))
expect(tempFiles.length).toBe(1)
})
await waitForExpect(async () => {
const currentFile = tempFiles[0]
const fileContents = await fs.promises.readFile(
`${defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY}/${currentFile}`,
'utf8'
)
expect(fileContents).toEqual(`{"window_id":"abc1234","data":"${veryLongString}"}\n`)
})
}, 40000)
test.skip(`multiple recording events writes compressed data to s3`, async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()
const sessionId = new UUIDT().toString()
// need to send enough data to trigger the s3 upload exactly once.
// with a buffer of 1024, an estimated gzip compression of 0.1, and 1025 default length for generateAVeryLongString
// we need 25,000 events.
// if any of those things change then the number of events probably needs to change too
const captures = Array.from({ length: 25000 }).map(() => {
return capture({
teamId,
distinctId,
uuid: new UUIDT().toString(),
event: '$snapshot',
properties: {
$session_id: sessionId,
$window_id: 'abc1234',
$snapshot_data: { data: compressToString(generateVeryLongString()), chunk_count: 1 },
},
})
})
await Promise.all(captures)
await waitForExpect(async () => {
const s3Files = await s3.send(
new ListObjectsV2Command({
Bucket: defaultConfig.OBJECT_STORAGE_BUCKET,
Prefix: `${defaultConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${teamId}/session_id/${sessionId}`,
})
)
expect(s3Files.Contents?.length).toBe(1)
const s3File = s3Files.Contents?.[0]
if (!s3File) {
throw new Error('No s3File')
}
const s3FileContents: GetObjectCommandOutput = await s3.send(
new GetObjectCommand({
Bucket: defaultConfig.OBJECT_STORAGE_BUCKET,
Key: s3File.Key,
})
)
const fileStream = await s3FileContents.Body?.transformToByteArray()
if (!fileStream) {
throw new Error('No fileStream')
}
const text = zlib.gunzipSync(fileStream).toString().trim()
// text contains JSON for {
// "window_id": "abc1234",
// "data": "random...string" // thousands of characters
// }
expect(text).toMatch(/{"window_id":"abc1234","data":"\w+"}/)
})
}, 20000)

View File

@ -54,6 +54,7 @@ test.concurrent(
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
const sessionId = new UUIDT().toString()
await capture({
teamId,
@ -61,7 +62,7 @@ test.concurrent(
uuid,
event: '$snapshot',
properties: {
$session_id: '1234abc',
$session_id: sessionId,
$window_id: 'abc1234',
$snapshot_data: 'yes way',
},
@ -84,7 +85,7 @@ test.concurrent(
has_full_snapshot: 0,
keypress_count: 0,
last_event_timestamp: null,
session_id: '1234abc',
session_id: sessionId,
snapshot_data: 'yes way',
team_id: teamId,
timestamp: expect.any(String),

View File

@ -35,6 +35,8 @@
"repository": "https://github.com/PostHog/posthog-plugin-server",
"license": "MIT",
"dependencies": {
"@aws-sdk/client-s3": "^3.315.0",
"@aws-sdk/lib-storage": "^3.315.0",
"@babel/core": "^7.18.10",
"@babel/plugin-transform-react-jsx": "^7.18.10",
"@babel/preset-env": "^7.18.10",

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processPluginJobs: true,
processAsyncHandlers: true,
sessionRecordingIngestion: true,
sessionRecordingBlobIngestion: true,
...sharedCapabilities,
}
case 'ingestion':
@ -40,10 +41,14 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
}
case 'recordings-ingestion':
return {
mmdb: false,
sessionRecordingIngestion: true,
...sharedCapabilities,
}
case 'recordings-blob-ingestion':
return {
sessionRecordingBlobIngestion: true,
...sharedCapabilities,
}
case 'async':
return {

View File

@ -96,11 +96,11 @@ export function getDefaultConfig(): PluginsServerConfig {
BUFFER_CONVERSION_SECONDS: isDevEnv() ? 2 : 60, // KEEP IN SYNC WITH posthog/settings/ingestion.py
PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min
KAFKA_HEALTHCHECK_SECONDS: 20,
OBJECT_STORAGE_ENABLED: false,
OBJECT_STORAGE_ENABLED: true,
OBJECT_STORAGE_ENDPOINT: 'http://localhost:19000',
OBJECT_STORAGE_REGION: 'us-east-1',
OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user',
OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password',
OBJECT_STORAGE_SESSION_RECORDING_FOLDER: 'session_recordings',
OBJECT_STORAGE_BUCKET: 'posthog',
PLUGIN_SERVER_MODE: null,
KAFKAJS_LOG_LEVEL: 'WARN',
@ -112,6 +112,14 @@ export function getDefaultConfig(): PluginsServerConfig {
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0,
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: BW Change this to 'all' when we release it fully
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10, // NOTE: 10 minutes
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: ['dev', 'test'].includes(process.env.NODE_ENV || 'undefined')
? 1024 // NOTE: ~100KB in dev or test, so that even with gzipped content we still flush pretty frequently
: 1024 * 50, // ~50MB after compression in prod
SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings',
}
}

View File

@ -1,5 +1,5 @@
import { defaultConfig } from './config/config'
import { connectObjectStorage } from './main/services/object_storage'
import { getObjectStorage } from './main/services/object_storage'
import { status } from './utils/status'
import { createRedis } from './utils/utils'
@ -23,11 +23,11 @@ const redisHealthcheck = async (): Promise<boolean> => {
}
const storageHealthcheck = async (): Promise<boolean> => {
if (!defaultConfig.OBJECT_STORAGE_ENABLED) {
const storage = getObjectStorage(defaultConfig)
if (!storage) {
return true
}
const storage = connectObjectStorage(defaultConfig)
try {
const storageHealthy = await storage.healthcheck()
if (storageHealthy) {

View File

@ -1,4 +1,4 @@
import { GlobalConfig, Message } from 'node-rdkafka'
import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka'
import { exponentialBuckets, Histogram } from 'prom-client'
import { status } from '../utils/status'
@ -11,6 +11,13 @@ import {
instrumentConsumerMetrics,
} from './consumer'
export interface BatchConsumer {
consumer: KafkaConsumer
join: () => Promise<void>
stop: () => Promise<void>
isHealthy: () => boolean
}
export const startBatchConsumer = async ({
connectionConfig,
groupId,
@ -21,6 +28,7 @@ export const startBatchConsumer = async ({
consumerMaxWaitMs,
fetchBatchSize,
eachBatch,
autoCommit = true,
}: {
connectionConfig: GlobalConfig
groupId: string
@ -31,7 +39,8 @@ export const startBatchConsumer = async ({
consumerMaxWaitMs: number
fetchBatchSize: number
eachBatch: (messages: Message[]) => Promise<void>
}) => {
autoCommit?: boolean
}): Promise<BatchConsumer> => {
// Starts consuming from `topic` in batches of `fetchBatchSize` messages,
// with consumer group id `groupId`. We use `connectionConfig` to connect
// to Kafka. We commit offsets after each batch has been processed,
@ -124,6 +133,11 @@ export const startBatchConsumer = async ({
const messages = await consumeMessages(consumer, fetchBatchSize)
status.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length })
if (!messages.length) {
// For now
continue
}
consumerBatchSize.labels({ topic, groupId }).observe(messages.length)
for (const message of messages) {
consumedMessageSizeBytes.labels({ topic, groupId }).observe(message.size)
@ -133,7 +147,9 @@ export const startBatchConsumer = async ({
// the implementation of `eachBatch`.
await eachBatch(messages)
commitOffsetsForMessages(messages, consumer)
if (autoCommit) {
commitOffsetsForMessages(messages, consumer)
}
}
} catch (error) {
status.error('🔁', 'main_loop_error', { error })
@ -178,7 +194,7 @@ export const startBatchConsumer = async ({
}
}
return { isHealthy, stop, join }
return { isHealthy, stop, join, consumer }
}
export const consumerBatchSize = new Histogram({

View File

@ -1,5 +1,6 @@
import {
ClientMetrics,
CODES,
ConsumerGlobalConfig,
KafkaConsumer as RdKafkaConsumer,
LibrdKafkaError,
@ -79,10 +80,20 @@ export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: st
// TODO: add other relevant metrics here
// TODO: expose the internal librdkafka metrics as well.
consumer.on('rebalance', (error: LibrdKafkaError, assignments: TopicPartition[]) => {
if (error) {
status.error('⚠️', 'rebalance_error', { error: error })
/**
* see https://github.com/Blizzard/node-rdkafka#rebalancing errors are used to signal
* both errors and _not_ errors
*
* When rebalancing starts the consumer receives ERR_REVOKED_PARTITIONS
* And when the balancing is completed the new assignments are received with ERR__ASSIGN_PARTITIONS
*/
if (error.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
status.info('📝️', 'librdkafka rebalance, partitions assigned', { assignments })
} else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
status.info('📝️', 'librdkafka rebalance started, partitions revoked', { assignments })
} else {
status.info('📝', 'librdkafka rebalance', { assignments: assignments })
// We had a "real" error
status.error('⚠️', 'rebalance_error', { error })
}
latestOffsetTimestampGauge.reset()
@ -160,10 +171,10 @@ export const disconnectConsumer = async (consumer: RdKafkaConsumer) => {
await new Promise((resolve, reject) => {
consumer.disconnect((error, data) => {
if (error) {
status.error('🔥', 'Failed to disconnect session recordings consumer', { error })
status.error('🔥', 'Failed to disconnect node-rdkafka consumer', { error })
reject(error)
} else {
status.info('🔁', 'Disconnected session recordings consumer')
status.info('🔁', 'Disconnected session node-rdkafka consumer')
resolve(data)
}
})

View File

@ -0,0 +1,117 @@
/**
* - A note on Kafka partitioning
*
* We are ingesting events and partitioning based on session_id. This means that we can have multiple sessions
* going to one partition, but for any given ID they should land all on the same partition.
*
* As we want to buffer events before writing them to S3, we don't auto-commit our kafka consumer offsets.
* Instead, we track all offsets that are "in-flight" and when we flush a buffer to S3 we remove these in-flight offsets
* and write the oldest offset to Kafka. This allows us to resume from the oldest offset in the case of a consumer
* restart or rebalance, even if some of the following offsets have already been written to S3.
*
* The other trick is when a rebalance occurs we need to remove all in-flight sessions for partitions that are no longer
* assigned to us.
*
* This all works based on the idea that there is only one consumer (Orchestrator) per partition, allowing us to
* track everything in this single process
*/
import { KafkaConsumer } from 'node-rdkafka'
import { status } from '../../../../utils/status'
export class OffsetManager {
// We have to track every message's offset so that we can commit them only after they've been written to S3
offsetsByPartitionTopic: Map<string, number[]> = new Map()
constructor(private consumer: KafkaConsumer) {}
public addOffset(topic: string, partition: number, offset: number): void {
const key = `${topic}-${partition}`
if (!this.offsetsByPartitionTopic.has(key)) {
this.offsetsByPartitionTopic.set(key, [])
}
// TODO: We should parseInt when we handle the message
this.offsetsByPartitionTopic.get(key)?.push(offset)
}
/**
* When a rebalance occurs we need to remove all in-flight offsets for partitions that are no longer
* assigned to this consumer.
*
* @param topic
* @param assignedPartitions The partitions that are still assigned to this consumer after a rebalance
*/
public cleanPartitions(topic: string, assignedPartitions: number[]): void {
const assignedKeys = assignedPartitions.map((partition) => `${topic}-${partition}`)
const keysToDelete = new Set<string>()
for (const [key] of this.offsetsByPartitionTopic) {
if (!assignedKeys.includes(key)) {
keysToDelete.add(key)
}
}
keysToDelete.forEach((key) => {
this.offsetsByPartitionTopic.delete(key)
})
}
// TODO: Ensure all offsets passed here are already checked to be part of the same partition
public removeOffsets(topic: string, partition: number, offsets: number[]): number | undefined {
// TRICKY - We want to find the newest offset from the ones being removed that is
// older than the oldest in the list
// e.g. [3, 4, 8, 10] -> removing [3,8] should end up with [4,10] and commit 3
// e.g. [3, 4, 8, 10 ] -> removing [10] should end up with [3,4,8] and commit nothing
if (!offsets.length) {
return
}
let offsetToCommit: number | undefined
const offsetsToRemove = offsets.sort((a, b) => a - b)
const key = `${topic}-${partition}`
const inFlightOffsets = this.offsetsByPartitionTopic.get(key)
status.info('💾', `Current offsets: ${inFlightOffsets}`)
status.info('💾', `Removing offsets: ${offsets}`)
if (!inFlightOffsets) {
// TODO: Add a metric so that we can see if and when this happens
status.warn('💾', `No inflight offsets found for key: ${key}.`)
return
}
offsetsToRemove.forEach((offset) => {
// Remove from the list. If it is the lowest value - set it
const offsetIndex = inFlightOffsets.indexOf(offset)
if (offsetIndex >= 0) {
inFlightOffsets.splice(offsetIndex, 1)
}
// As the offsets are ordered we can simply check if we are removing from the start
// Higher offsets will update this value
if (offsetIndex === 0) {
offsetToCommit = offset
}
})
this.offsetsByPartitionTopic.set(key, inFlightOffsets)
if (offsetToCommit) {
status.info('💾', `Committing offset ${offsetToCommit} for ${topic}-${partition}`)
this.consumer.commit({
topic,
partition,
offset: offsetToCommit,
})
} else {
status.info('💾', `No offset to commit from: ${inFlightOffsets}`)
}
return offsetToCommit
}
}

View File

@ -0,0 +1,257 @@
import { Upload } from '@aws-sdk/lib-storage'
import { captureException } from '@sentry/node'
import { randomUUID } from 'crypto'
import { createReadStream, writeFileSync } from 'fs'
import { appendFile, unlink } from 'fs/promises'
import path from 'path'
import { Counter } from 'prom-client'
import * as zlib from 'zlib'
import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
import { ObjectStorage } from '../../../services/object_storage'
import { IncomingRecordingMessage } from './types'
import { convertToPersistedMessage } from './utils'
export const counterS3FilesWritten = new Counter({
name: 'recording_s3_files_written',
help: 'Indicates that a given key has overflowed capacity and been redirected to a different topic. Value incremented once a minute.',
labelNames: ['partition_key'],
})
const ESTIMATED_GZIP_COMPRESSION_RATIO = 0.1
// The buffer is a list of messages grouped
type SessionBuffer = {
id: string
count: number
size: number
createdAt: Date
file: string
offsets: number[]
}
async function deleteFile(file: string) {
try {
await unlink(file)
} catch (err) {
if (err && err.code === 'ENOENT') {
status.warn('⚠️', 'blob_ingester_session_manager failed deleting file ' + file + ', file not found', {
err,
file,
})
return
}
status.error('🧨', 'blob_ingester_session_manager failed deleting file ' + file, { err, file })
captureException(err)
throw err
}
}
export class SessionManager {
chunks: Map<string, IncomingRecordingMessage[]> = new Map()
buffer: SessionBuffer
flushBuffer?: SessionBuffer
constructor(
public readonly serverConfig: PluginsServerConfig,
public readonly s3Client: ObjectStorage['s3'],
public readonly teamId: number,
public readonly sessionId: string,
public readonly partition: number,
public readonly topic: string,
private readonly onFinish: (offsetsToRemove: number[]) => void
) {
this.buffer = this.createBuffer()
// this.lastProcessedOffset = redis.get(`session-recording-last-offset-${this.sessionId}`) || 0
}
public async add(message: IncomingRecordingMessage): Promise<void> {
// TODO: Check that the offset is higher than the lastProcessed
// If not - ignore it
// If it is - update lastProcessed and process it
if (message.chunk_count === 1) {
await this.addToBuffer(message)
} else {
await this.addToChunks(message)
}
await this.flushIfNeccessary(true)
}
public get isEmpty(): boolean {
return this.buffer.count === 0 && this.chunks.size === 0
}
public async flushIfNeccessary(shouldLog = false): Promise<void> {
const bufferSizeKb = this.buffer.size / 1024
const gzipSizeKb = bufferSizeKb * ESTIMATED_GZIP_COMPRESSION_RATIO
const gzippedCapacity = gzipSizeKb / this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB
if (shouldLog) {
status.info(
'🚽',
`blob_ingester_session_manager Buffer ${this.sessionId}:: buffer size: ${
this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB
}kb capacity: ${(gzippedCapacity * 100).toFixed(2)}%: count: ${this.buffer.count} ${Math.round(
bufferSizeKb
)}KB (~ ${Math.round(gzipSizeKb)}KB GZIP) chunks: ${this.chunks.size})`,
{
sizeInBufferKB: bufferSizeKb,
estimatedSizeInGzipKB: gzipSizeKb,
bufferThreshold: this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB,
calculatedCapacity: gzippedCapacity,
}
)
}
const overCapacity = gzippedCapacity > 1
const timeSinceLastFlushTooLong =
Date.now() - this.buffer.createdAt.getTime() >=
this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000
const readyToFlush = overCapacity || timeSinceLastFlushTooLong
if (readyToFlush) {
status.info('🚽', `blob_ingester_session_manager Flushing buffer ${this.sessionId}...`)
await this.flush()
}
}
/**
* Flushing takes the current buffered file and moves it to the flush buffer
* We then attempt to write the events to S3 and if successful, we clear the flush buffer
*/
public async flush(): Promise<void> {
if (this.flushBuffer) {
status.warn('⚠️', "blob_ingester_session_manager Flush called but we're already flushing")
return
}
// We move the buffer to the flush buffer and create a new buffer so that we can safely write the buffer to disk
this.flushBuffer = this.buffer
this.buffer = this.createBuffer()
try {
const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.teamId}/session_id/${this.sessionId}`
const dataKey = `${baseKey}/data/${this.flushBuffer.createdAt.getTime()}` // TODO: Change to be based on events times
// TODO should only compress over some threshold? Depends how many uncompressed files we see below c200kb
const fileStream = createReadStream(this.flushBuffer.file).pipe(zlib.createGzip())
const parallelUploads3 = new Upload({
client: this.s3Client,
params: {
Bucket: this.serverConfig.OBJECT_STORAGE_BUCKET,
Key: dataKey,
Body: fileStream,
},
})
await parallelUploads3.done()
counterS3FilesWritten.inc(1)
// TODO: Add prometheus metric for the size of the file as well
// counterS3FilesWritten.add(1, {
// bytes: this.flushBuffer.size, // since the file is compressed this is wrong, and we don't know the compressed size 🤔
// })
} catch (error) {
// TODO: If we fail to write to S3 we should be do something about it
status.error('🧨', 'blob_ingester_session_manager failed writing session recording blob to S3', error)
captureException(error)
} finally {
await deleteFile(this.flushBuffer.file)
const offsets = this.flushBuffer.offsets
this.flushBuffer = undefined
status.debug(
'🚽',
`blob_ingester_session_manager Flushed buffer ${this.sessionId} (removing offsets: ${offsets})`
)
// TODO: Sync the last processed offset to redis
this.onFinish(offsets)
}
}
private createBuffer(): SessionBuffer {
const id = randomUUID()
const buffer = {
id,
count: 0,
size: 0,
createdAt: new Date(),
file: path.join(
this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY,
`${this.teamId}.${this.sessionId}.${id}.jsonl`
),
offsets: [],
}
// NOTE: We can't do this easily async as we would need to handle the race condition of multiple events coming in at once.
writeFileSync(buffer.file, '', 'utf-8')
return buffer
}
/**
* Full messages (all chunks) are added to the buffer directly
*/
private async addToBuffer(message: IncomingRecordingMessage): Promise<void> {
const content = JSON.stringify(convertToPersistedMessage(message)) + '\n'
this.buffer.count += 1
this.buffer.size += Buffer.byteLength(content)
this.buffer.offsets.push(message.metadata.offset)
try {
await appendFile(this.buffer.file, content, 'utf-8')
} catch (e) {
status.error('🧨', 'blob_ingester_session_manager failed writing session recording buffer to disk', e)
captureException(e)
throw e
}
}
/**
* Chunked messages are added to the chunks map
* Once all chunks are received, the message is added to the buffer
*
*/
private async addToChunks(message: IncomingRecordingMessage): Promise<void> {
// If it is a chunked message we add to the collected chunks
let chunks: IncomingRecordingMessage[] = []
if (!this.chunks.has(message.chunk_id)) {
this.chunks.set(message.chunk_id, chunks)
} else {
chunks = this.chunks.get(message.chunk_id) || []
}
chunks.push(message)
if (chunks.length === message.chunk_count) {
// If we have all the chunks, we can add the message to the buffer
// We want to add all the chunk offsets as well so that they are tracked correctly
chunks.forEach((x) => {
this.buffer.offsets.push(x.metadata.offset)
})
await this.addToBuffer({
...message,
data: chunks
.sort((a, b) => a.chunk_index - b.chunk_index)
.map((c) => c.data)
.join(''),
})
this.chunks.delete(message.chunk_id)
}
}
public async destroy(): Promise<void> {
status.debug('␡', `blob_ingester_session_manager Destroying session manager ${this.sessionId}`)
const filePromises: Promise<void>[] = [this.flushBuffer?.file, this.buffer.file]
.filter((x): x is string => x !== undefined)
.map((x) => deleteFile(x))
await Promise.all(filePromises)
}
}

View File

@ -0,0 +1,32 @@
// This is the incoming message from Kafka
export type IncomingRecordingMessage = {
metadata: {
topic: string
partition: number
offset: number
}
team_id: number
distinct_id: string
session_id: string
window_id?: string
// Properties data
chunk_id: string
chunk_index: number
chunk_count: number
data: string
compresssion: string
has_full_snapshot: boolean
events_summary: {
timestamp: number
type: number
data: any
}[]
}
// This is the incoming message from Kafka
export type PersistedRecordingMessage = {
window_id?: string
data: any
}

View File

@ -0,0 +1,23 @@
import zlib from 'node:zlib'
import { IncomingRecordingMessage, PersistedRecordingMessage } from './types'
// NOTE: These functions are meant to be identical to those in posthog/session_recordings/session_recording_helpers.py
export function compressToString(input: string): string {
const compressed_data = zlib.gzipSync(Buffer.from(input, 'utf16le'))
return compressed_data.toString('base64')
}
export function decompressFromString(input: string): string {
const compressedData = Buffer.from(input, 'base64')
const uncompressed = zlib.gunzipSync(compressedData)
// Trim is quick way to get rid of BOMs created by python
return uncompressed.toString('utf16le').trim()
}
export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => {
return {
window_id: message.window_id,
data: decompressFromString(message.data),
}
}

View File

@ -0,0 +1,295 @@
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, HighLevelProducer as RdKafkaProducer, Message } from 'node-rdkafka'
import { KAFKA_SESSION_RECORDING_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, Team } from '../../../types'
import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
import { TeamManager } from '../../../worker/ingestion/team-manager'
import { ObjectStorage } from '../../services/object_storage'
import { OffsetManager } from './blob-ingester/offset-manager'
import { SessionManager } from './blob-ingester/session-manager'
import { IncomingRecordingMessage } from './blob-ingester/types'
const groupId = 'session-recordings-blob'
const sessionTimeout = 30000
const fetchBatchSize = 500
export class SessionRecordingBlobIngester {
sessions: Map<string, SessionManager> = new Map()
offsetManager?: OffsetManager
batchConsumer?: BatchConsumer
producer?: RdKafkaProducer
lastHeartbeat: number = Date.now()
flushInterval: NodeJS.Timer | null = null
enabledTeams: number[] | null
constructor(
private teamManager: TeamManager,
private serverConfig: PluginsServerConfig,
private objectStorage: ObjectStorage
) {
const enabledTeamsString = this.serverConfig.SESSION_RECORDING_BLOB_PROCESSING_TEAMS
this.enabledTeams =
enabledTeamsString === 'all' || enabledTeamsString.trim().length === 0
? null
: enabledTeamsString.split(',').map(parseInt)
}
public async consume(event: IncomingRecordingMessage): Promise<void> {
const { team_id, session_id } = event
const key = `${team_id}-${session_id}`
const { partition, topic, offset } = event.metadata
if (!this.sessions.has(key)) {
const { partition, topic } = event.metadata
const sessionManager = new SessionManager(
this.serverConfig,
this.objectStorage.s3,
team_id,
session_id,
partition,
topic,
(offsets) => {
this.offsetManager?.removeOffsets(topic, partition, offsets)
// If the SessionManager is done (flushed and with no more queued events) then we remove it to free up memory
if (sessionManager.isEmpty) {
this.sessions.delete(key)
}
}
)
this.sessions.set(key, sessionManager)
}
this.offsetManager?.addOffset(topic, partition, offset)
await this.sessions.get(key)?.add(event)
// TODO: If we error here, what should we do...?
// If it is unrecoverable we probably want to remove the offset
// If it is recoverable, we probably want to retry?
}
public async handleKafkaMessage(message: Message): Promise<void> {
const statusWarn = (reason: string, error?: Error) => {
status.warn('⚠️', 'invalid_message', {
reason,
error,
partition: message.partition,
offset: message.offset,
})
}
if (!message.value) {
return statusWarn('empty')
}
let messagePayload: RawEventMessage
let event: PipelineEvent
try {
messagePayload = JSON.parse(message.value.toString())
event = JSON.parse(messagePayload.data)
} catch (error) {
return statusWarn('invalid_json', error)
}
if (event.event !== '$snapshot') {
status.debug('🙈', 'Received non-snapshot message, ignoring')
return
}
if (messagePayload.team_id == null && !messagePayload.token) {
return statusWarn('no_token')
}
let team: Team | null = null
if (messagePayload.team_id != null) {
team = await this.teamManager.fetchTeam(messagePayload.team_id)
} else if (messagePayload.token) {
team = await this.teamManager.getTeamByToken(messagePayload.token)
}
if (team == null) {
return statusWarn('team_not_found')
}
if (this.enabledTeams && !this.enabledTeams.includes(team.id)) {
// NOTE: due to the high volume of hits here we don't log this
return
}
status.info('⬆️', 'processing_session_recording_blob', { uuid: messagePayload.uuid })
const $snapshot_data = event.properties?.$snapshot_data
const recordingMessage: IncomingRecordingMessage = {
metadata: {
partition: message.partition,
topic: message.topic,
offset: message.offset,
},
team_id: team.id,
distinct_id: event.distinct_id,
session_id: event.properties?.$session_id,
window_id: event.properties?.$window_id,
// Properties data
chunk_id: $snapshot_data.chunk_id,
chunk_index: $snapshot_data.chunk_index,
chunk_count: $snapshot_data.chunk_count,
data: $snapshot_data.data,
compresssion: $snapshot_data.compression,
has_full_snapshot: $snapshot_data.has_full_snapshot,
events_summary: $snapshot_data.events_summary,
}
await this.consume(recordingMessage)
}
private async handleEachBatch(messages: Message[]): Promise<void> {
status.info('🔁', 'Processing recordings blob batch', { size: messages.length })
for (const message of messages) {
await this.handleKafkaMessage(message)
}
}
public async start(): Promise<void> {
status.info('🔁', 'Starting session recordings blob consumer')
// Currently we can't reuse any files stored on disk, so we opt to delete them all
rmSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true })
mkdirSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true })
status.info('🔁', 'Starting session recordings consumer')
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig as KafkaConfig)
this.producer = await createKafkaProducer(connectionConfig)
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
this.batchConsumer = await startBatchConsumer({
connectionConfig,
groupId,
topic: KAFKA_SESSION_RECORDING_EVENTS,
sessionTimeout,
consumerMaxBytes: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
fetchBatchSize,
autoCommit: false,
eachBatch: async (messages) => {
return await this.handleEachBatch(messages)
},
})
this.offsetManager = new OffsetManager(this.batchConsumer.consumer)
this.batchConsumer.consumer.on('rebalance', async (err, assignments) => {
/**
* see https://github.com/Blizzard/node-rdkafka#rebalancing
*
* This event is received when the consumer group starts _or_ finishes rebalancing.
*
* Also, see https://docs.confluent.io/platform/current/clients/librdkafka/html/classRdKafka_1_1RebalanceCb.html
* For eager/non-cooperative partition.assignment.strategy assignors, such as range and roundrobin,
* the application must use assign() to set and unassign() to clear the entire assignment.
* For the cooperative assignors, such as cooperative-sticky, the application must use
* incremental_assign() for ERR__ASSIGN_PARTITIONS and incremental_unassign() for ERR__REVOKE_PARTITIONS.
*/
status.info('🏘️', 'Blob ingestion consumer rebalanced')
if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
status.info('⚖️', 'Blob ingestion consumer has received assignments', { assignments })
const partitions = assignments.map((assignment) => assignment.partition)
this.offsetManager?.cleanPartitions(KAFKA_SESSION_RECORDING_EVENTS, partitions)
await Promise.all(
[...this.sessions.values()]
.filter((session) => !partitions.includes(session.partition))
.map((session) => session.destroy())
)
// Assign partitions to the consumer
// TODO read offset position from partitions so we can read from the correct place
// TODO looking here https://github.com/Blizzard/node-rdkafka/blob/master/lib/kafka-consumer.js#L54
// TODO we should not need to handle the assignment ourself since rebalance_cb = true
// this.batchConsumer?.consumer.assign(assignments)
} else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
status.info('⚖️', 'Blob ingestion consumer has had assignments revoked', { assignments })
/**
* The revoke_partitions event occurs when the Kafka Consumer is part of a consumer group and the group rebalances.
* As a result, some partitions previously assigned to a consumer might be taken away (revoked) and reassigned to another consumer.
* After the revoke_partitions event is handled, the consumer will receive an assign_partitions event,
* which will inform the consumer of the new set of partitions it is responsible for processing.
*
* Depending on why the rebalancing is occurring and the partition.assignment.strategy,
* A partition revoked here, may be assigned back to the same consumer.
*
* This is where we could act to reduce raciness/duplication when partitions are reassigned to different consumers
* e.g. stop the `flushInterval` and wait for the `assign_partitions` event to start it again.
*/
// this.batchConsumer?.consumer.unassign()
} else {
// We had a "real" error
status.error('🔥', 'Blob ingestion consumer rebalancing error', { err })
// TODO: immediately die? or just keep going?
}
})
// Make sure to disconnect the producer after we've finished consuming.
this.batchConsumer.join().finally(async () => {
if (this.producer && this.producer.isConnected()) {
status.debug('🔁', 'disconnecting kafka producer in session recordings batchConsumer finally')
await disconnectProducer(this.producer)
}
})
this.batchConsumer.consumer.on('disconnected', async (err) => {
// since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect
// we need to listen to disconnect and make sure we're stopped
status.info('🔁', 'Blob ingestion consumer disconnected, cleaning up', { err })
await this.stop()
})
// We trigger the flushes from this level to reduce the number of running timers
this.flushInterval = setInterval(() => {
this.sessions.forEach((sessionManager) => {
void sessionManager.flushIfNeccessary()
})
}, 10000)
}
public async stop(): Promise<void> {
status.info('🔁', 'Stopping session recordings consumer')
if (this.flushInterval) {
clearInterval(this.flushInterval)
}
if (this.producer && this.producer.isConnected()) {
status.info('🔁', 'disconnecting kafka producer in session recordings batchConsumer stop')
await disconnectProducer(this.producer)
}
await this.batchConsumer?.stop()
// This is inefficient but currently necessary due to new instances restarting from the committed offset point
const destroyPromises: Promise<void>[] = []
this.sessions.forEach((sessionManager) => {
destroyPromises.push(sessionManager.destroy())
})
await Promise.allSettled(destroyPromises)
this.sessions = new Map()
}
}

View File

@ -6,18 +6,18 @@ import {
KAFKA_PERFORMANCE_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
} from '../../config/kafka-topics'
import { startBatchConsumer } from '../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../kafka/config'
import { retryOnDependencyUnavailableError } from '../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../kafka/producer'
import { PipelineEvent, RawEventMessage, Team } from '../../types'
import { KafkaConfig } from '../../utils/db/hub'
import { status } from '../../utils/status'
import { createPerformanceEvent, createSessionRecordingEvent } from '../../worker/ingestion/process-event'
import { TeamManager } from '../../worker/ingestion/team-manager'
import { parseEventTimestamp } from '../../worker/ingestion/timestamps'
import { eventDroppedCounter } from './metrics'
} from '../../../config/kafka-topics'
import { startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { retryOnDependencyUnavailableError } from '../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../kafka/producer'
import { PipelineEvent, RawEventMessage, Team } from '../../../types'
import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
import { createPerformanceEvent, createSessionRecordingEvent } from '../../../worker/ingestion/process-event'
import { TeamManager } from '../../../worker/ingestion/team-manager'
import { parseEventTimestamp } from '../../../worker/ingestion/timestamps'
import { eventDroppedCounter } from '../metrics'
export const startSessionRecordingEventsConsumer = async ({
teamManager,

View File

@ -27,8 +27,10 @@ import { startJobsConsumer } from './ingestion-queues/jobs-consumer'
import { IngestionConsumer } from './ingestion-queues/kafka-queue'
import { startOnEventHandlerConsumer } from './ingestion-queues/on-event-handler-consumer'
import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer'
import { startSessionRecordingEventsConsumer } from './ingestion-queues/session-recordings-consumer'
import { SessionRecordingBlobIngester } from './ingestion-queues/session-recording/session-recordings-blob-consumer'
import { startSessionRecordingEventsConsumer } from './ingestion-queues/session-recording/session-recordings-consumer'
import { createHttpServer } from './services/http-server'
import { getObjectStorage } from './services/object_storage'
const { version } = require('../../package.json')
@ -87,7 +89,9 @@ export async function startPluginsServer(
// meantime.
let bufferConsumer: Consumer | undefined
let stopSessionRecordingEventsConsumer: (() => void) | undefined
let stopSessionRecordingBlobConsumer: (() => void) | undefined
let joinSessionRecordingEventsConsumer: ((timeout?: number) => Promise<void>) | undefined
let joinSessionRecordingBlobConsumer: ((timeout?: number) => Promise<void>) | undefined
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined
@ -126,6 +130,7 @@ export async function startPluginsServer(
bufferConsumer?.disconnect(),
jobsConsumer?.disconnect(),
stopSessionRecordingEventsConsumer?.(),
stopSessionRecordingBlobConsumer?.(),
schedulerTasksConsumer?.disconnect(),
])
@ -419,6 +424,23 @@ export async function startPluginsServer(
healthChecks['session-recordings'] = isSessionRecordingsHealthy
}
if (capabilities.sessionRecordingBlobIngestion) {
const postgres = hub?.postgres ?? createPostgresPool(serverConfig.DATABASE_URL)
const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig)
const s3 = hub?.objectStorage ?? getObjectStorage(serverConfig)
if (!s3) {
throw new Error("Can't start session recording blob ingestion without object storage")
}
const ingester = new SessionRecordingBlobIngester(teamManager, serverConfig, s3)
await ingester.start()
const batchConsumer = ingester.batchConsumer
if (batchConsumer) {
stopSessionRecordingBlobConsumer = () => ingester.stop()
joinSessionRecordingBlobConsumer = () => batchConsumer.join()
healthChecks['session-recordings-blob'] = () => batchConsumer.isHealthy() ?? false
}
}
if (capabilities.http) {
httpServer = createHttpServer(healthChecks, analyticsEventsIngestionConsumer, piscina)
}
@ -444,6 +466,9 @@ export async function startPluginsServer(
if (joinSessionRecordingEventsConsumer) {
joinSessionRecordingEventsConsumer().catch(closeJobs)
}
if (joinSessionRecordingBlobConsumer) {
joinSessionRecordingBlobConsumer().catch(closeJobs)
}
return serverInstance ?? { stop: closeJobs }
} catch (error) {

View File

@ -1,63 +1,69 @@
import { HeadBucketCommand, S3Client } from '@aws-sdk/client-s3'
import { PluginsServerConfig } from '../../types'
import { status } from '../../utils/status'
const aws = require('aws-sdk')
let S3: typeof aws.S3 | null = null
export interface ObjectStorage {
healthcheck: () => Promise<boolean>
s3: S3Client
}
let objectStorage: ObjectStorage | undefined
// Object Storage added without any uses to flush out deployment concerns.
// see https://github.com/PostHog/posthog/pull/9901
export const connectObjectStorage = (serverConfig: Partial<PluginsServerConfig>): ObjectStorage => {
let storage = {
healthcheck: async () => {
return Promise.resolve(true) // healthy if object storage isn't configured
},
}
try {
const {
OBJECT_STORAGE_ENDPOINT,
OBJECT_STORAGE_ACCESS_KEY_ID,
OBJECT_STORAGE_SECRET_ACCESS_KEY,
OBJECT_STORAGE_ENABLED,
OBJECT_STORAGE_BUCKET,
} = serverConfig
export const getObjectStorage = (serverConfig: Partial<PluginsServerConfig>): ObjectStorage | undefined => {
if (!objectStorage) {
try {
const {
OBJECT_STORAGE_ENDPOINT,
OBJECT_STORAGE_REGION,
OBJECT_STORAGE_ACCESS_KEY_ID,
OBJECT_STORAGE_SECRET_ACCESS_KEY,
OBJECT_STORAGE_ENABLED,
OBJECT_STORAGE_BUCKET,
} = serverConfig
if (OBJECT_STORAGE_ENABLED && !S3) {
S3 = new aws.S3({
endpoint: OBJECT_STORAGE_ENDPOINT,
accessKeyId: OBJECT_STORAGE_ACCESS_KEY_ID,
secretAccessKey: OBJECT_STORAGE_SECRET_ACCESS_KEY,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4',
})
if (OBJECT_STORAGE_ENABLED) {
const credentials =
OBJECT_STORAGE_ACCESS_KEY_ID && OBJECT_STORAGE_SECRET_ACCESS_KEY
? {
accessKeyId: OBJECT_STORAGE_ACCESS_KEY_ID,
secretAccessKey: OBJECT_STORAGE_SECRET_ACCESS_KEY,
}
: undefined
storage = {
healthcheck: async () => {
if (!OBJECT_STORAGE_BUCKET) {
status.error('😢', 'No object storage bucket configured')
return false
}
const S3 = new S3Client({
region: OBJECT_STORAGE_REGION,
endpoint: OBJECT_STORAGE_ENDPOINT,
credentials,
forcePathStyle: true, // needed with minio?
// signatureVersion: 'v4',
})
try {
await S3.headBucket({
Bucket: OBJECT_STORAGE_BUCKET,
}).promise()
return true
} catch (error) {
status.error('💣', 'Could not access bucket:', error)
return false
}
},
objectStorage = {
healthcheck: async () => {
if (!OBJECT_STORAGE_BUCKET) {
status.error('😢', 'No object storage bucket configured')
return false
}
try {
await S3.send(new HeadBucketCommand({ Bucket: OBJECT_STORAGE_BUCKET }))
return true
} catch (error) {
status.error('💣', 'Could not access bucket:', error)
return false
}
},
s3: S3,
}
}
} catch (e) {
// only warn here... object storage is not mandatory until after #9901 at the earliest
status.warn('😢', 'could not initialise storage:', e)
}
} catch (e) {
// only warn here... object storage is not mandatory until after #9901 at the earliest
status.warn('😢', 'could not initialise storage:', e)
}
return storage
return objectStorage
}

View File

@ -154,10 +154,10 @@ export interface PluginsServerConfig {
PERSON_INFO_CACHE_TTL: number
KAFKA_HEALTHCHECK_SECONDS: number
OBJECT_STORAGE_ENABLED: boolean // Disables or enables the use of object storage. It will become mandatory to use object storage
OBJECT_STORAGE_ENDPOINT: string // minio endpoint
OBJECT_STORAGE_REGION: string // s3 region
OBJECT_STORAGE_ENDPOINT: string // s3 endpoint
OBJECT_STORAGE_ACCESS_KEY_ID: string
OBJECT_STORAGE_SECRET_ACCESS_KEY: string
OBJECT_STORAGE_SESSION_RECORDING_FOLDER: string // the top level folder for storing session recordings inside the storage bucket
OBJECT_STORAGE_BUCKET: string // the object storage bucket name
PLUGIN_SERVER_MODE:
| 'ingestion'
@ -168,6 +168,7 @@ export interface PluginsServerConfig {
| 'scheduler'
| 'analytics-ingestion'
| 'recordings-ingestion'
| 'recordings-blob-ingestion'
| null
KAFKAJS_LOG_LEVEL: 'NOTHING' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR'
HISTORICAL_EXPORTS_ENABLED: boolean // enables historical exports for export apps
@ -180,6 +181,12 @@ export interface PluginsServerConfig {
EVENT_OVERFLOW_BUCKET_CAPACITY: number
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number
CLOUD_DEPLOYMENT: string
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: string
SESSION_RECORDING_LOCAL_DIRECTORY: string
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number
SESSION_RECORDING_REMOTE_FOLDER: string
}
export interface Hub extends PluginsServerConfig {
@ -236,6 +243,7 @@ export interface PluginServerCapabilities {
processPluginJobs?: boolean
processAsyncHandlers?: boolean
sessionRecordingIngestion?: boolean
sessionRecordingBlobIngestion?: boolean
http?: boolean
mmdb?: boolean
}

View File

@ -15,7 +15,7 @@ import { getPluginServerCapabilities } from '../../capabilities'
import { defaultConfig } from '../../config/config'
import { KAFKAJS_LOG_LEVEL_MAPPING } from '../../config/constants'
import { KAFKA_JOBS } from '../../config/kafka-topics'
import { connectObjectStorage } from '../../main/services/object_storage'
import { getObjectStorage } from '../../main/services/object_storage'
import {
EnqueuedPluginJob,
Hub,
@ -160,11 +160,12 @@ export async function createHub(
status.info('👍', `Redis ready`)
status.info('🤔', `Connecting to object storage...`)
try {
connectObjectStorage(serverConfig)
const objectStorage = getObjectStorage(serverConfig)
if (objectStorage) {
status.info('👍', 'Object storage ready')
} catch (e) {
status.warn('🪣', `Object storage could not be created: ${e}`)
} else {
status.warn('🪣', `Object storage could not be created`)
}
const promiseManager = new PromiseManager(serverConfig, statsd)
@ -232,6 +233,7 @@ export async function createHub(
kafkaProducer,
statsd,
enqueuePluginJob,
objectStorage: objectStorage,
plugins: new Map(),
pluginConfigs: new Map(),

View File

@ -94,6 +94,8 @@ function promptForMode(mode: PluginsServerConfig['PLUGIN_SERVER_MODE']): string
return 'INGESTION-OVERFLOW'
case 'recordings-ingestion':
return 'RECORDINGS-INGESTION'
case 'recordings-blob-ingestion':
return 'RECORDINGS-BLOB-INGESTION'
case 'async':
return 'ASYNC'
case 'exports':

View File

@ -0,0 +1,105 @@
import { KafkaConsumer } from 'node-rdkafka'
import { OffsetManager } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/offset-manager'
describe('offset-manager', () => {
const TOPIC = 'test-session-recordings'
let offsetManager: OffsetManager
const mockConsumer = {
commit: jest.fn(() => Promise.resolve()),
}
beforeEach(() => {
mockConsumer.commit.mockClear()
offsetManager = new OffsetManager(mockConsumer as unknown as KafkaConsumer)
})
it('collects new offsets', () => {
offsetManager.addOffset(TOPIC, 1, 1)
offsetManager.addOffset(TOPIC, 2, 1)
offsetManager.addOffset(TOPIC, 3, 4)
offsetManager.addOffset(TOPIC, 1, 2)
offsetManager.addOffset(TOPIC, 1, 5)
offsetManager.addOffset(TOPIC, 3, 4)
expect(offsetManager.offsetsByPartitionTopic).toEqual(
new Map([
['test-session-recordings-1', [1, 2, 5]],
['test-session-recordings-2', [1]],
['test-session-recordings-3', [4, 4]],
])
)
})
it('removes offsets', () => {
offsetManager.addOffset(TOPIC, 1, 1)
offsetManager.addOffset(TOPIC, 2, 1)
offsetManager.addOffset(TOPIC, 3, 4)
offsetManager.addOffset(TOPIC, 1, 2)
offsetManager.addOffset(TOPIC, 1, 5)
offsetManager.addOffset(TOPIC, 3, 4)
offsetManager.removeOffsets(TOPIC, 1, [1, 2])
expect(offsetManager.offsetsByPartitionTopic).toEqual(
new Map([
['test-session-recordings-1', [5]],
['test-session-recordings-2', [1]],
['test-session-recordings-3', [4, 4]],
])
)
})
it.each([
[[1], 1],
[[2, 5, 10], undefined],
[[1, 2, 3, 9], 3],
])('commits the appropriate offset ', (removals: number[], expectation: number | null | undefined) => {
;[1, 2, 3, 4, 5, 6, 7, 8, 9, 10].forEach((offset) => {
offsetManager.addOffset(TOPIC, 1, offset)
})
const result = offsetManager.removeOffsets(TOPIC, 1, removals)
expect(result).toEqual(expectation)
if (result === undefined) {
expect(mockConsumer.commit).toHaveBeenCalledTimes(0)
} else {
expect(mockConsumer.commit).toHaveBeenCalledTimes(1)
expect(mockConsumer.commit).toHaveBeenCalledWith({
offset: result,
partition: 1,
topic: 'test-session-recordings',
})
}
})
it('does not commits revoked partition offsets ', () => {
;[1, 2, 3, 4, 5, 6, 7, 8, 9, 10].forEach((offset) => {
offsetManager.addOffset(TOPIC, 1, offset)
})
offsetManager.addOffset(TOPIC, 1, 1)
offsetManager.addOffset(TOPIC, 2, 2)
offsetManager.addOffset(TOPIC, 3, 3)
offsetManager.cleanPartitions(TOPIC, [2, 3])
const resultOne = offsetManager.removeOffsets(TOPIC, 1, [1])
expect(resultOne).toEqual(undefined)
expect(mockConsumer.commit).toHaveBeenCalledTimes(0)
mockConsumer.commit.mockClear()
const resultTwo = offsetManager.removeOffsets(TOPIC, 2, [2])
expect(resultTwo).toEqual(2)
expect(mockConsumer.commit).toHaveBeenCalledTimes(1)
mockConsumer.commit.mockClear()
const resultThree = offsetManager.removeOffsets(TOPIC, 3, [3])
expect(resultThree).toEqual(3)
expect(mockConsumer.commit).toHaveBeenCalledTimes(1)
})
})

View File

@ -0,0 +1,102 @@
import fs from 'node:fs'
import { defaultConfig } from '../../../../../src/config/config'
import { SessionManager } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/session-manager'
import { compressToString } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils'
import { createChunkedIncomingRecordingMessage, createIncomingRecordingMessage } from '../fixtures'
describe('session-manager', () => {
let sessionManager: SessionManager
const mockFinish = jest.fn()
const mockS3Client: any = {
send: jest.fn(),
}
beforeEach(() => {
sessionManager = new SessionManager(defaultConfig, mockS3Client, 1, 'session_id_1', 1, 'topic', mockFinish)
mockFinish.mockClear()
})
it('adds a message', async () => {
const payload = JSON.stringify([{ simple: 'data' }])
const event = createIncomingRecordingMessage({
data: compressToString(payload),
})
await sessionManager.add(event)
expect(sessionManager.buffer).toEqual({
count: 1,
createdAt: expect.any(Date),
file: expect.any(String),
id: expect.any(String),
size: 61, // The size of the event payload - this may change when test data changes
offsets: [1],
})
const fileContents = JSON.parse(fs.readFileSync(sessionManager.buffer.file, 'utf-8'))
expect(fileContents.data).toEqual(payload)
})
it('flushes messages', async () => {
const event = createIncomingRecordingMessage()
await sessionManager.add(event)
expect(sessionManager.buffer.count).toEqual(1)
const file = sessionManager.buffer.file
expect(fs.existsSync(file)).toEqual(true)
const afterResumeFlushPromise = sessionManager.flush()
expect(sessionManager.buffer.count).toEqual(0)
expect(sessionManager.flushBuffer?.count).toEqual(1)
await afterResumeFlushPromise
expect(sessionManager.flushBuffer).toEqual(undefined)
expect(mockFinish).toBeCalledTimes(1)
expect(fs.existsSync(file)).toEqual(false)
})
it('flushes messages and whilst collecting new ones', async () => {
const event = createIncomingRecordingMessage()
const event2 = createIncomingRecordingMessage()
await sessionManager.add(event)
expect(sessionManager.buffer.count).toEqual(1)
const flushPromise = sessionManager.flush()
await sessionManager.add(event2)
expect(sessionManager.buffer.count).toEqual(1)
expect(sessionManager.flushBuffer?.count).toEqual(1)
await flushPromise
expect(sessionManager.flushBuffer).toEqual(undefined)
expect(sessionManager.buffer.count).toEqual(1)
})
it('chunks incoming messages', async () => {
const events = createChunkedIncomingRecordingMessage(3, {
data: compressToString(JSON.stringify([{ simple: 'data' }])),
})
expect(events.length).toEqual(3)
expect(events[0].data.length).toBeGreaterThan(1)
expect(events[1].data.length).toBeGreaterThan(1)
expect(events[2].data.length).toBeGreaterThan(1)
await sessionManager.add(events[0])
expect(sessionManager.buffer.count).toEqual(0)
expect(sessionManager.chunks.size).toEqual(1)
await sessionManager.add(events[2])
expect(sessionManager.buffer.count).toEqual(0)
expect(sessionManager.chunks.size).toEqual(1)
await sessionManager.add(events[1])
expect(sessionManager.buffer.count).toEqual(1)
expect(sessionManager.chunks.size).toEqual(0)
const fileContents = JSON.parse(fs.readFileSync(sessionManager.buffer.file, 'utf-8'))
expect(fileContents.data).toEqual('[{"simple":"data"}]')
})
})

View File

@ -0,0 +1,22 @@
import {
compressToString,
decompressFromString,
} from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils'
// NOTE: This was copied from the output of the related python ingestion code
const compressedData =
'H4sIAFUyHGQC/1WOwQ6CMBBE36cYzh5QFMWf8AOMB6KQ9IASC4nE+OvqlK4kpNl2OjO7O9/PiRcJHQMtldCBBRlL3QlXSimlscHnudPz4DJ5V+ZtpXic/E7oJhz1OP9pv5wdhXUMxgUmNc5p5zxDmNdo25Faxwt15kh5c1bNfX5M3CjPP1/cudVbsFGtNTtjP3b/AMlNphkAAQAA'
describe('compression', () => {
it('should compress and decompress a string consistently', () => {
const compressedAndDecompressed = decompressFromString(compressToString('hello world'))
expect(compressedAndDecompressed).toEqual('hello world')
})
it('should decompress string from the python version', () => {
const decompressed = decompressFromString(compressedData)
expect(decompressed).toEqual(
`[{"type": 3, "data": {"source": 1, "positions": [{"x": 679, "y": 790, "id": 3, "timeOffset": 0}]}, "timestamp": 1679569492338}]`
)
})
})

View File

@ -0,0 +1,316 @@
{
"event": "$snapshot",
"properties": {
"$snapshot_data": {
"type": 2,
"data": {
"node": {
"type": 0,
"childNodes": [
{ "type": 1, "name": "html", "publicId": "", "systemId": "", "id": 2 },
{
"type": 2,
"tagName": "html",
"attributes": { "lang": "en" },
"childNodes": [
{
"type": 2,
"tagName": "head",
"attributes": {},
"childNodes": [
{
"type": 2,
"tagName": "style",
"attributes": { "data-next-hide-fouc": "true" },
"childNodes": [
{
"type": 3,
"textContent": "body { display: none; }",
"isStyle": true,
"id": 6
}
],
"id": 5
},
{
"type": 2,
"tagName": "noscript",
"attributes": { "data-next-hide-fouc": "true" },
"childNodes": [
{
"type": 3,
"textContent": "<style>body{display:block}</style>",
"id": 8
}
],
"id": 7
},
{
"type": 2,
"tagName": "meta",
"attributes": { "charset": "utf-8" },
"childNodes": [],
"id": 9
},
{
"type": 2,
"tagName": "title",
"attributes": {},
"childNodes": [{ "type": 3, "textContent": "PostHog", "id": 11 }],
"id": 10
},
{
"type": 2,
"tagName": "meta",
"attributes": {
"name": "viewport",
"content": "width=device-width, initial-scale=1"
},
"childNodes": [],
"id": 12
},
{
"type": 2,
"tagName": "meta",
"attributes": { "name": "next-head-count", "content": "3" },
"childNodes": [],
"id": 13
},
{
"type": 2,
"tagName": "noscript",
"attributes": { "data-n-css": "" },
"childNodes": [],
"id": 14
},
{
"type": 2,
"tagName": "script",
"attributes": {
"defer": "",
"nomodule": "",
"src": "http://localhost:3001/_next/static/chunks/polyfills.js?ts=1679568313753"
},
"childNodes": [],
"id": 15
},
{
"type": 2,
"tagName": "script",
"attributes": {
"src": "http://localhost:3001/_next/static/chunks/webpack.js?ts=1679568313753",
"defer": ""
},
"childNodes": [],
"id": 16
},
{
"type": 2,
"tagName": "script",
"attributes": {
"src": "http://localhost:3001/_next/static/chunks/main.js?ts=1679568313753",
"defer": ""
},
"childNodes": [],
"id": 17
},
{
"type": 2,
"tagName": "script",
"attributes": {
"src": "http://localhost:3001/_next/static/chunks/pages/_app.js?ts=1679568313753",
"defer": ""
},
"childNodes": [],
"id": 18
},
{
"type": 2,
"tagName": "script",
"attributes": {
"src": "http://localhost:3001/_next/static/chunks/pages/index.js?ts=1679568313753",
"defer": ""
},
"childNodes": [],
"id": 19
},
{
"type": 2,
"tagName": "script",
"attributes": {
"src": "http://localhost:3001/_next/static/development/_buildManifest.js?ts=1679568313753",
"defer": ""
},
"childNodes": [],
"id": 20
},
{
"type": 2,
"tagName": "script",
"attributes": {
"src": "http://localhost:3001/_next/static/development/_ssgManifest.js?ts=1679568313753",
"defer": ""
},
"childNodes": [],
"id": 21
},
{
"type": 2,
"tagName": "style",
"attributes": {},
"childNodes": [
{
"type": 3,
"textContent": "main { margin: 0px auto; max-width: 1200px; padding: 2rem; font-family: helvetica, arial, sans-serif; }.buttons { display: flex; gap: 0.5rem; }",
"isStyle": true,
"id": 23
}
],
"id": 22
},
{
"type": 2,
"tagName": "noscript",
"attributes": { "id": "__next_css__DO_NOT_USE__" },
"childNodes": [],
"id": 24
}
],
"id": 4
},
{
"type": 2,
"tagName": "body",
"attributes": {},
"childNodes": [
{
"type": 2,
"tagName": "div",
"attributes": { "id": "__next" },
"childNodes": [
{
"type": 2,
"tagName": "main",
"attributes": {},
"childNodes": [
{
"type": 2,
"tagName": "h1",
"attributes": {},
"childNodes": [
{ "type": 3, "textContent": "PostHog React", "id": 29 }
],
"id": 28
},
{
"type": 2,
"tagName": "div",
"attributes": { "class": "buttons" },
"childNodes": [
{
"type": 2,
"tagName": "button",
"attributes": {},
"childNodes": [
{
"type": 3,
"textContent": "Capture event",
"id": 32
}
],
"id": 31
},
{
"type": 2,
"tagName": "button",
"attributes": { "data-attr": "autocapture-button" },
"childNodes": [
{
"type": 3,
"textContent": "Autocapture buttons",
"id": 34
}
],
"id": 33
},
{
"type": 2,
"tagName": "button",
"attributes": {
"class": "ph-no-capture",
"rr_width": "0px",
"rr_height": "0px"
},
"childNodes": [],
"id": 35
}
],
"id": 30
},
{
"type": 2,
"tagName": "p",
"attributes": {},
"childNodes": [
{
"type": 3,
"textContent": "Feature flag response: ",
"id": 37
}
],
"id": 36
}
],
"id": 27
}
],
"id": 26
},
{
"type": 2,
"tagName": "script",
"attributes": {
"type": "text/javascript",
"src": "http://localhost:8000/static/recorder.js?v=1.51.2"
},
"childNodes": [],
"id": 38
},
{
"type": 2,
"tagName": "script",
"attributes": {
"src": "http://localhost:3001/_next/static/chunks/react-refresh.js?ts=1679568313753"
},
"childNodes": [],
"id": 39
},
{
"type": 2,
"tagName": "script",
"attributes": { "id": "__NEXT_DATA__", "type": "application/json" },
"childNodes": [
{ "type": 3, "textContent": "SCRIPT_PLACEHOLDER", "id": 41 }
],
"id": 40
}
],
"id": 25
}
],
"id": 3
}
],
"id": 1
},
"initialOffset": { "left": 0, "top": 0 }
},
"timestamp": 1679568314158
},
"$session_id": "1870e0e4f40d73-07e335ea680392-1e525634-384000-1870e0e4f412a52",
"$window_id": "1870e0e4f422d99-042f3b7593666e-1e525634-384000-1870e0e4f432e59",
"token": "phc_6L6EWZMlGPXWTSbwpqChJM6IuR0XV4I98wMjgLD8Yds",
"distinct_id": "64H9Wag8cxxCaBtb4tTG1ENFQ8K9fqIQPm2zEq6JOgg"
},
"offset": 2984
}

View File

@ -0,0 +1,15 @@
{
"event": "$snapshot",
"properties": {
"$snapshot_data": {
"type": 4,
"data": { "href": "http://localhost:3001/", "width": 2560, "height": 832 },
"timestamp": 1679568314155
},
"$session_id": "1870e0e4f40d73-07e335ea680392-1e525634-384000-1870e0e4f412a52",
"$window_id": "1870e0e4f422d99-042f3b7593666e-1e525634-384000-1870e0e4f432e59",
"token": "phc_6L6EWZMlGPXWTSbwpqChJM6IuR0XV4I98wMjgLD8Yds",
"distinct_id": "64H9Wag8cxxCaBtb4tTG1ENFQ8K9fqIQPm2zEq6JOgg"
},
"offset": 2987
}

View File

@ -0,0 +1,59 @@
import { randomUUID } from 'node:crypto'
import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/types'
import { compressToString } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils'
import jsonFullSnapshot from './data/snapshot-full.json'
export function createIncomingRecordingMessage(data: Partial<IncomingRecordingMessage> = {}): IncomingRecordingMessage {
return {
metadata: {
topic: 'session_recording_events',
partition: 1,
offset: 1,
},
team_id: 1,
distinct_id: 'distinct_id',
session_id: 'session_id_1',
window_id: 'window_id_1',
// Properties data
chunk_id: 'chunk_id_1',
chunk_index: 0,
chunk_count: 1,
data: compressToString(JSON.stringify(jsonFullSnapshot)),
compresssion: 'gzip-base64',
has_full_snapshot: true,
events_summary: [
{
timestamp: 1679568043305,
type: 4,
data: { href: 'http://localhost:3001/', width: 2560, height: 1304 },
},
],
...data,
}
}
export function createChunkedIncomingRecordingMessage(
chunks: number,
data: Partial<IncomingRecordingMessage> = {}
): IncomingRecordingMessage[] {
const chunkId = randomUUID()
const coreMessage = createIncomingRecordingMessage(data)
const chunkLength = coreMessage.data.length / chunks
const messages: IncomingRecordingMessage[] = []
// Iterate over chunks count and clone the core message with the data split into chunks
for (let i = 0; i < chunks; i++) {
messages.push({
...coreMessage,
chunk_id: chunkId,
chunk_index: i,
chunk_count: chunks,
data: coreMessage.data.slice(i * chunkLength, (i + 1) * chunkLength),
})
}
return messages
}

View File

@ -0,0 +1,60 @@
import { waitForExpect } from '../../../../functional_tests/expectations'
import { defaultConfig } from '../../../../src/config/config'
import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer'
import { Hub } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { UUIDT } from '../../../../src/utils/utils'
import { createIncomingRecordingMessage } from './fixtures'
function assertIngesterHasExpectedPartitions(ingester: SessionRecordingBlobIngester, expectedPartitions: number[]) {
const partitions: Set<number> = new Set()
ingester.sessions.forEach((session) => {
partitions.add(session.partition)
})
expect(Array.from(partitions)).toEqual(expectedPartitions)
}
describe('ingester rebalancing tests', () => {
let ingesterOne: SessionRecordingBlobIngester
let ingesterTwo: SessionRecordingBlobIngester
let hub: Hub
let closeHub: () => Promise<void>
beforeEach(async () => {
;[hub, closeHub] = await createHub()
})
afterEach(async () => {
await ingesterOne?.stop()
await ingesterTwo?.stop()
await closeHub()
})
it('rebalances partitions safely from one to two consumers', async () => {
ingesterOne = new SessionRecordingBlobIngester(hub.teamManager, defaultConfig, hub.objectStorage)
await ingesterOne.start()
await ingesterOne.consume(
createIncomingRecordingMessage({ session_id: new UUIDT().toString(), chunk_count: 2 })
)
await ingesterOne.consume(
createIncomingRecordingMessage({ session_id: new UUIDT().toString(), chunk_count: 2 })
)
await waitForExpect(() => {
assertIngesterHasExpectedPartitions(ingesterOne, [1])
})
ingesterTwo = new SessionRecordingBlobIngester(hub.teamManager, defaultConfig, hub.objectStorage)
await ingesterTwo.start()
await waitForExpect(() => {
assertIngesterHasExpectedPartitions(ingesterOne, [1])
// only one partition so nothing for the new consumer to do
assertIngesterHasExpectedPartitions(ingesterTwo, [])
})
})
})

View File

@ -0,0 +1,50 @@
import { defaultConfig } from '../../../../src/config/config'
import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer'
import { Hub } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { createIncomingRecordingMessage } from './fixtures'
describe('ingester', () => {
let ingester: SessionRecordingBlobIngester
let hub: Hub
let closeHub: () => Promise<void>
beforeEach(async () => {
;[hub, closeHub] = await createHub()
})
afterEach(async () => {
await closeHub()
})
beforeEach(() => {
ingester = new SessionRecordingBlobIngester(hub.teamManager, defaultConfig, hub.objectStorage)
})
it('creates a new session manager if needed', async () => {
const event = createIncomingRecordingMessage()
await ingester.consume(event)
expect(ingester.sessions.size).toBe(1)
expect(ingester.sessions.has('1-session_id_1')).toEqual(true)
})
it('handles multiple incoming sessions', async () => {
const event = createIncomingRecordingMessage()
const event2 = createIncomingRecordingMessage({
session_id: 'session_id_2',
})
await Promise.all([ingester.consume(event), ingester.consume(event2)])
expect(ingester.sessions.size).toBe(2)
expect(ingester.sessions.has('1-session_id_1')).toEqual(true)
expect(ingester.sessions.has('1-session_id_2')).toEqual(true)
})
it('destroys a session manager if finished', async () => {
const event = createIncomingRecordingMessage()
await ingester.consume(event)
expect(ingester.sessions.has('1-session_id_1')).toEqual(true)
await ingester.sessions.get('1-session_id_1')?.flush()
expect(ingester.sessions.has('1-session_id_1')).toEqual(false)
})
})

View File

@ -1,10 +1,10 @@
import LibrdKafkaError from 'node-rdkafka/lib/error'
import { Pool } from 'pg'
import { defaultConfig } from '../../../src/config/config'
import { eachBatch } from '../../../src/main/ingestion-queues/session-recordings-consumer'
import { TeamManager } from '../../../src/worker/ingestion/team-manager'
import { createOrganization, createTeam } from '../../helpers/sql'
import { defaultConfig } from '../../../../src/config/config'
import { eachBatch } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer'
import { TeamManager } from '../../../../src/worker/ingestion/team-manager'
import { createOrganization, createTeam } from '../../../helpers/sql'
describe('session-recordings-consumer', () => {
const producer = {