0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

feat: Replay events consumer (#16642)

This commit is contained in:
Ben White 2023-07-20 16:41:25 +02:00 committed by GitHub
parent 910f88fd5e
commit d8df34f4ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 548 additions and 504 deletions

View File

@ -11,7 +11,6 @@
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
<env name="KAFKA_HOSTS" value="localhost:9092" />
<env name="OBJECT_STORAGE_ENABLED" value="True" />
<env name="SESSION_RECORDING_ENABLE_OFFSET_HIGH_WATER_MARK_PROCESSING" value="true" />
<env name="SESSION_RECORDING_KAFKA_BATCH_SIZE" value="200" />
<env name="SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS" value="180" />
<env name="SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS" value="all" />

View File

@ -14,7 +14,6 @@
<env name="KEA_VERBOSE_LOGGING" value="false" />
<env name="PRINT_SQL" value="1" />
<env name="PYTHONUNBUFFERED" value="1" />
<env name="REPLAY_BLOB_INGESTION_TRAFFIC_RATIO" value="1" />
<env name="SESSION_RECORDING_KAFKA_COMPRESSION" value="gzip" />
<env name="SESSION_RECORDING_KAFKA_HOSTS" value="localhost" />
<env name="SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES" value="524288" />

2
.vscode/launch.json vendored
View File

@ -70,7 +70,7 @@
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
"SKIP_SERVICE_VERSION_REQUIREMENTS": "1",
"PRINT_SQL": "1",
"REPLAY_BLOB_INGESTION_TRAFFIC_RATIO": "1.0"
"REPLAY_EVENTS_NEW_CONSUMER_RATIO": "1.0"
},
"console": "integratedTerminal",
"python": "${workspaceFolder}/env/bin/python",

View File

@ -1,135 +0,0 @@
import { GetObjectCommand, GetObjectCommandOutput, ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3'
import { readdir, readFile } from 'fs/promises'
import { join } from 'path'
import * as zlib from 'zlib'
import { defaultConfig } from '../src/config/config'
import { compressToString } from '../src/main/ingestion-queues/session-recording/blob-ingester/utils'
import { bufferFileDir } from '../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer'
import { getObjectStorage } from '../src/main/services/object_storage'
import { UUIDT } from '../src/utils/utils'
import { capture, createOrganization, createTeam } from './api'
import { waitForExpect } from './expectations'
let organizationId: string
let s3: S3Client
function generateVeryLongString(length = 1025) {
return [...Array(length)].map(() => Math.random().toString(36)[2]).join('')
}
beforeAll(async () => {
organizationId = await createOrganization()
const objectStorage = getObjectStorage({
OBJECT_STORAGE_ENDPOINT: defaultConfig.OBJECT_STORAGE_ENDPOINT,
OBJECT_STORAGE_REGION: defaultConfig.OBJECT_STORAGE_REGION,
OBJECT_STORAGE_ACCESS_KEY_ID: defaultConfig.OBJECT_STORAGE_ACCESS_KEY_ID,
OBJECT_STORAGE_SECRET_ACCESS_KEY: defaultConfig.OBJECT_STORAGE_SECRET_ACCESS_KEY,
OBJECT_STORAGE_ENABLED: defaultConfig.OBJECT_STORAGE_ENABLED,
OBJECT_STORAGE_BUCKET: defaultConfig.OBJECT_STORAGE_BUCKET,
})
if (!objectStorage) {
throw new Error('S3 not configured')
}
s3 = objectStorage.s3
})
// having these tests is causing flapping failures in other tests :/
// eg.https://github.com/PostHog/posthog/actions/runs/4802953306/jobs/8553849494
test.skip(`single recording event writes data to local tmp file`, async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
const sessionId = new UUIDT().toString()
const veryLongString = generateVeryLongString()
await capture({
teamId,
distinctId,
uuid,
event: '$snapshot',
properties: {
$session_id: sessionId,
$window_id: 'abc1234',
$snapshot_data: { data: compressToString(veryLongString), chunk_count: 1 },
},
topic: 'session_recording_events',
})
let tempFiles: string[] = []
await waitForExpect(async () => {
const files = await readdir(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY))
tempFiles = files.filter((f) => f.startsWith(`${teamId}.${sessionId}`))
expect(tempFiles.length).toBe(1)
})
await waitForExpect(async () => {
const currentFile = tempFiles[0]
const fileContents = await readFile(
join(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY), currentFile),
'utf8'
)
expect(fileContents).toEqual(`{"window_id":"abc1234","data":"${veryLongString}"}\n`)
})
}, 40000)
test.skip(`multiple recording events writes compressed data to s3`, async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()
const sessionId = new UUIDT().toString()
// need to send enough data to trigger the s3 upload exactly once.
// with a buffer of 1024, an estimated gzip compression of 0.1, and 1025 default length for generateAVeryLongString
// we need 25,000 events.
// if any of those things change then the number of events probably needs to change too
const captures = Array.from({ length: 25000 }).map(() => {
return capture({
teamId,
distinctId,
uuid: new UUIDT().toString(),
event: '$snapshot',
properties: {
$session_id: sessionId,
$window_id: 'abc1234',
$snapshot_data: { data: compressToString(generateVeryLongString()), chunk_count: 1 },
},
topic: 'session_recording_events',
})
})
await Promise.all(captures)
await waitForExpect(async () => {
const s3Files = await s3.send(
new ListObjectsV2Command({
Bucket: defaultConfig.OBJECT_STORAGE_BUCKET,
Prefix: `${defaultConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${teamId}/session_id/${sessionId}`,
})
)
expect(s3Files.Contents?.length).toBeGreaterThanOrEqual(1)
const s3File = s3Files.Contents?.[0]
if (!s3File) {
throw new Error('No s3File')
}
const s3FileContents: GetObjectCommandOutput = await s3.send(
new GetObjectCommand({
Bucket: defaultConfig.OBJECT_STORAGE_BUCKET,
Key: s3File.Key,
})
)
const fileStream = await s3FileContents.Body?.transformToByteArray()
if (!fileStream) {
throw new Error('No fileStream')
}
const text = zlib.gunzipSync(fileStream).toString().trim()
// text contains JSON for {
// "window_id": "abc1234",
// "data": "random...string" // thousands of characters
// }
expect(text).toMatch(/{"window_id":"abc1234","data":"\w+"}/)
}, 40000)
}, 50000)

View File

@ -122,9 +122,6 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
// this defaults to true, but is used to disable for testing in production
SESSION_RECORDING_ENABLE_OFFSET_HIGH_WATER_MARK_PROCESSING: true,
SESSION_RECORDING_KAFKA_HOSTS: undefined,
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: undefined,
SESSION_RECORDING_KAFKA_BATCH_SIZE: 500,
@ -143,7 +140,7 @@ export function getDefaultConfig(): PluginsServerConfig {
}
}
export const sessionRecordingBlobConsumerConfig = (config: PluginsServerConfig): PluginsServerConfig => {
export const sessionRecordingConsumerConfig = (config: PluginsServerConfig): PluginsServerConfig => {
// When running the blob consumer we override a bunch of settings to use the session recording ones if available
return {
...config,

View File

@ -135,38 +135,54 @@ export const consumeMessages = async (consumer: RdKafkaConsumer, fetchBatchSize:
})
})
}
export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => {
// Get the offsets for the last message for each partition, from
// messages
const offsets = messages.reduce((acc, message) => {
export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPartitionOffset[] => {
// We only need to commit the highest offset for a batch of messages
const messagesByTopicPartition = messages.reduce((acc, message) => {
if (!acc[message.topic]) {
acc[message.topic] = {}
}
if (!acc[message.topic][message.partition.toString()]) {
acc[message.topic][message.partition.toString()] = message.offset
} else if (message.offset > acc[message.topic][message.partition.toString()]) {
acc[message.topic][message.partition.toString()] = message.offset
if (!acc[message.topic][message.partition]) {
acc[message.topic][message.partition] = []
}
return acc
}, {} as { [topic: string]: { [partition: string]: number } })
acc[message.topic][message.partition].push(message)
return acc
}, {} as { [topic: string]: { [partition: number]: TopicPartitionOffset[] } })
// Then we find the highest offset for each topic partition
const highestOffsets = Object.entries(messagesByTopicPartition).flatMap(([topic, partitions]) => {
return Object.entries(partitions).map(([partition, messages]) => {
const highestOffset = Math.max(...messages.map((message) => message.offset))
const topicPartitionOffsets = Object.entries(offsets).flatMap(([topic, partitions]) => {
return Object.entries(partitions).map(([partition, offset]) => {
return {
topic,
partition: parseInt(partition),
offset: offset + 1,
offset: highestOffset,
}
})
})
return highestOffsets
}
export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => {
const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => {
return {
...message,
// When committing to Kafka you commit the offset of the next message you want to consume
offset: message.offset + 1,
}
})
if (topicPartitionOffsets.length > 0) {
status.debug('📝', 'Committing offsets', { topicPartitionOffsets })
consumer.commit(topicPartitionOffsets)
}
}
export const disconnectConsumer = async (consumer: RdKafkaConsumer) => {
await new Promise((resolve, reject) => {
consumer.disconnect((error, data) => {

View File

@ -1,13 +0,0 @@
import { DateTime } from 'luxon'
import { IncomingRecordingMessage, PersistedRecordingMessage } from './types'
export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => {
return {
window_id: message.window_id,
data: message.events,
}
}
// Helper to return now as a milliseconds timestamp
export const now = () => DateTime.now().toMillis()

View File

@ -10,7 +10,7 @@ export const offsetHighWaterMarkKey = (prefix: string, tp: TopicPartition) => {
return `${prefix}/${tp.topic}/${tp.partition}`
}
export type SessionOffsetHighWaterMarks = Record<string, number | undefined>
export type OffsetHighWaterMarks = Record<string, number | undefined>
/**
* If a file is written to S3 we need to know the offset of the last message in that file so that we can
@ -22,10 +22,10 @@ export type SessionOffsetHighWaterMarks = Record<string, number | undefined>
* which offsets have been written to S3 for each session, and which haven't
* so that we don't re-process those messages.
*/
export class SessionOffsetHighWaterMark {
export class OffsetHighWaterMarker {
// Watermarks are held in memory and synced back to redis on commit
// We don't need to load them more than once per TP as this consumer is the only thing writing to it
private topicPartitionWaterMarks: Record<string, Promise<SessionOffsetHighWaterMarks> | undefined> = {}
private topicPartitionWaterMarks: Record<string, Promise<OffsetHighWaterMarks> | undefined> = {}
constructor(private redisPool: RedisPool, private keyPrefix = '@posthog/replay/partition-high-water-marks') {}
@ -40,7 +40,7 @@ export class SessionOffsetHighWaterMark {
}
}
public async getWaterMarks(tp: TopicPartition): Promise<SessionOffsetHighWaterMarks> {
public async getWaterMarks(tp: TopicPartition): Promise<OffsetHighWaterMarks> {
const key = offsetHighWaterMarkKey(this.keyPrefix, tp)
// If we already have a watermark promise then we return it (i.e. we don't want to load the watermarks twice)
@ -50,17 +50,14 @@ export class SessionOffsetHighWaterMark {
).then((redisValue) => {
// NOTE: We do this in a secondary promise to release the previous redis client
// redisValue is an array of [sessionId, offset, sessionId, offset, ...]
// we want to convert it to an object of { sessionId: offset, sessionId: offset, ... }
const highWaterMarks = redisValue.reduce(
(acc: SessionOffsetHighWaterMarks, value: string, index: number) => {
if (index % 2 === 0) {
acc[value] = parseInt(redisValue[index + 1])
}
return acc
},
{}
)
// redisValue is an array of [key, offset, key, offset, ...]
// we want to convert it to an object of { key: offset, key: offset, ... }
const highWaterMarks = redisValue.reduce((acc: OffsetHighWaterMarks, value: string, index: number) => {
if (index % 2 === 0) {
acc[value] = parseInt(redisValue[index + 1])
}
return acc
}, {})
this.topicPartitionWaterMarks[key] = Promise.resolve(highWaterMarks)
@ -71,32 +68,37 @@ export class SessionOffsetHighWaterMark {
return this.topicPartitionWaterMarks[key]!
}
public async add(tp: TopicPartition, sessionId: string, offset: number): Promise<void> {
public async add(tp: TopicPartition, id: string, offset: number): Promise<void> {
const key = offsetHighWaterMarkKey(this.keyPrefix, tp)
const watermarks = await this.getWaterMarks(tp)
if (offset <= (watermarks[id] ?? -1)) {
// SANITY CHECK: We don't want to add an offset that is less than or equal to the current offset
return
}
// Immediately update the value so any subsequent calls to getWaterMarks will get the latest value
watermarks[id] = offset
this.topicPartitionWaterMarks[key] = Promise.resolve(watermarks)
try {
await this.run(`write offset high-water mark ${key} `, async (client) => {
const returnCountOfUpdatedAndAddedElements = 'CH'
const updatedCount = await client.zadd(key, returnCountOfUpdatedAndAddedElements, offset, sessionId)
const updatedCount = await client.zadd(key, returnCountOfUpdatedAndAddedElements, offset, id)
status.info('📝', 'WrittenOffsetCache added high-water mark for partition', {
key,
...tp,
sessionId,
id,
offset,
updatedCount,
})
}).then(async () => {
// NOTE: We do this in a secondary promise to release the previous redis client
const watermarks = await this.getWaterMarks(tp)
watermarks[sessionId] = offset
this.topicPartitionWaterMarks[key] = Promise.resolve(watermarks)
})
} catch (error) {
status.error('🧨', 'WrittenOffsetCache failed to add high-water mark for partition', {
error: error.message,
key,
...tp,
sessionId,
id,
offset,
})
captureException(error, {
@ -106,16 +108,16 @@ export class SessionOffsetHighWaterMark {
},
tags: {
...tp,
sessionId,
id,
},
})
}
}
public async onCommit(tp: TopicPartition, offset: number): Promise<void> {
public async clear(tp: TopicPartition, offset: number): Promise<void> {
const key = offsetHighWaterMarkKey(this.keyPrefix, tp)
try {
return await this.run(`commit all below offset high-water mark for ${key} `, async (client) => {
return await this.run(`clear all below offset high-water mark for ${key} `, async (client) => {
const numberRemoved = await client.zremrangebyscore(key, '-Inf', offset)
status.info('📝', 'WrittenOffsetCache committed all below high-water mark for partition', {
numberRemoved,
@ -124,9 +126,9 @@ export class SessionOffsetHighWaterMark {
})
const watermarks = await this.getWaterMarks(tp)
// remove each key in currentHighWaterMarks that has an offset less than or equal to the offset we just committed
Object.entries(watermarks).forEach(([sessionId, value]) => {
Object.entries(watermarks).forEach(([id, value]) => {
if (value && value <= offset) {
delete watermarks[sessionId]
delete watermarks[id]
}
})
@ -155,10 +157,10 @@ export class SessionOffsetHighWaterMark {
* it assumes that it has the latest high-water marks for this topic partition
* so that callers are safe to drop messages
*/
public async isBelowHighWaterMark(tp: TopicPartition, sessionId: string, offset: number): Promise<boolean> {
public async isBelowHighWaterMark(tp: TopicPartition, id: string, offset: number): Promise<boolean> {
const highWaterMarks = await this.getWaterMarks(tp)
return offset <= (highWaterMarks[sessionId] ?? -1)
return offset <= (highWaterMarks[id] ?? -1)
}
public revoke(tp: TopicPartition) {

View File

@ -7,8 +7,8 @@ import { PluginsServerConfig, RedisPool } from '../../../../types'
import { timeoutGuard } from '../../../../utils/db/utils'
import { status } from '../../../../utils/status'
import { createRedis } from '../../../../utils/utils'
import { IncomingRecordingMessage } from './types'
import { convertToPersistedMessage } from './utils'
import { IncomingRecordingMessage } from '../types'
import { convertToPersistedMessage } from '../utils'
const Keys = {
snapshots(teamId: number, suffix: string): string {

View File

@ -0,0 +1,187 @@
import { captureException, captureMessage } from '@sentry/node'
import { randomUUID } from 'crypto'
import { DateTime } from 'luxon'
import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-rdkafka-acosom'
import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
import { createSessionReplayEvent } from '../../../../worker/ingestion/process-event'
import { eventDroppedCounter } from '../../metrics'
import { IncomingRecordingMessage } from '../types'
import { OffsetHighWaterMarker } from './offset-high-water-marker'
const HIGH_WATERMARK_KEY = 'session_replay_events_ingester'
export class ReplayEventsIngester {
producer?: RdKafkaProducer
constructor(
private readonly serverConfig: PluginsServerConfig,
private readonly offsetHighWaterMarker: OffsetHighWaterMarker
) {}
public async consumeBatch(messages: IncomingRecordingMessage[]) {
const pendingProduceRequests: Promise<NumberNullUndefined>[] = []
for (const message of messages) {
const results = await retryOnDependencyUnavailableError(() => this.consume(message))
if (results) {
pendingProduceRequests.push(...results)
}
}
// On each loop, we flush the producer to ensure that all messages
// are sent to Kafka.
try {
await flushProducer(this.producer!)
} catch (error) {
// Rather than handling errors from flush, we instead handle
// errors per produce request, which gives us a little more
// flexibility in terms of deciding if it is a terminal
// error or not.
}
// We wait on all the produce requests to complete. After the
// flush they should all have been resolved/rejected already. If
// we get an intermittent error, such as a Kafka broker being
// unavailable, we will throw. We are relying on the Producer
// already having handled retries internally.
for (const produceRequest of pendingProduceRequests) {
try {
await produceRequest
} catch (error) {
status.error('🔁', 'main_loop_error', { error })
if (error?.isRetriable) {
// We assume the if the error is retriable, then we
// are probably in a state where e.g. Kafka is down
// temporarily and we would rather simply throw and
// have the process restarted.
throw error
}
}
}
const topicPartitionOffsets = findOffsetsToCommit(messages.map((message) => message.metadata))
await Promise.all(
topicPartitionOffsets.map((tpo) => this.offsetHighWaterMarker.add(tpo, HIGH_WATERMARK_KEY, tpo.offset))
)
}
public async consume(event: IncomingRecordingMessage): Promise<Promise<number | null | undefined>[] | void> {
const warn = (text: string, labels: Record<string, any> = {}) =>
status.warn('⚠️', text, {
offset: event.metadata.offset,
partition: event.metadata.partition,
...labels,
})
const drop = (reason: string, labels: Record<string, any> = {}) => {
eventDroppedCounter
.labels({
event_type: 'session_recordings_replay_events',
drop_cause: reason,
})
.inc()
warn(reason, {
reason,
...labels,
})
}
if (!this.producer) {
return drop('producer_not_ready')
}
if (event.replayIngestionConsumer !== 'v2') {
return drop('invalid_event_type')
}
if (
await this.offsetHighWaterMarker.isBelowHighWaterMark(
event.metadata,
HIGH_WATERMARK_KEY,
event.metadata.offset
)
) {
return drop('high_water_mark')
}
try {
const replayRecord = createSessionReplayEvent(
randomUUID(),
event.team_id,
event.distinct_id,
event.session_id,
event.events
)
try {
// the replay record timestamp has to be valid and be within a reasonable diff from now
if (replayRecord !== null) {
const asDate = DateTime.fromSQL(replayRecord.first_timestamp)
if (!asDate.isValid || Math.abs(asDate.diffNow('months').months) >= 0.99) {
captureMessage(`Invalid replay record timestamp: ${replayRecord.first_timestamp} for event`, {
extra: {
replayRecord,
uuid: replayRecord.uuid,
timestamp: replayRecord.first_timestamp,
},
tags: {
team: event.team_id,
session_id: replayRecord.session_id,
},
})
return drop('invalid_timestamp')
}
}
} catch (e) {
captureException(e, {
extra: {
replayRecord,
},
tags: {
team: event.team_id,
session_id: event.session_id,
},
})
return drop('session_replay_summarizer_error')
}
return [
produce({
producer: this.producer,
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
key: event.session_id,
}),
]
} catch (error) {
status.error('⚠️', 'processing_error', {
error: error,
})
}
}
public async start(): Promise<void> {
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig)
this.producer.connect()
}
public async stop(): Promise<void> {
status.info('🔁', 'ReplayEventsIngester - stopping')
if (this.producer && this.producer.isConnected()) {
status.info('🔁', 'ReplayEventsIngester disconnecting kafka producer in batchConsumer stop')
await disconnectProducer(this.producer)
}
}
}

View File

@ -12,10 +12,9 @@ import { PluginsServerConfig } from '../../../../types'
import { asyncTimeoutGuard, timeoutGuard } from '../../../../utils/db/utils'
import { status } from '../../../../utils/status'
import { ObjectStorage } from '../../../services/object_storage'
import { bufferFileDir } from '../session-recordings-blob-consumer'
import { IncomingRecordingMessage } from '../types'
import { bufferFileDir, convertToPersistedMessage, maxDefined, minDefined, now } from '../utils'
import { RealtimeManager } from './realtime-manager'
import { IncomingRecordingMessage } from './types'
import { convertToPersistedMessage, now } from './utils'
const BUCKETS_LINES_WRITTEN = [0, 10, 50, 100, 500, 1000, 2000, 5000, 10000, Infinity]
const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 5120, 10240, 20480, 51200, 102400, 204800, Infinity]
@ -77,8 +76,8 @@ type SessionBuffer = {
file: string
fileStream: WriteStream
offsets: {
lowest: number
highest: number
lowest?: number
highest?: number
}
eventsRange: {
firstTimestamp: number
@ -350,7 +349,9 @@ export class SessionManager {
// We want to delete the flush buffer before we proceed so that the onFinish handler doesn't reference it
void this.destroyBuffer(this.flushBuffer)
this.flushBuffer = undefined
this.onFinish([offsets.lowest, offsets.highest])
if (offsets.lowest && offsets.highest) {
this.onFinish([offsets.lowest, offsets.highest])
}
} catch (error) {
this.captureException(error)
} finally {
@ -374,10 +375,7 @@ export class SessionManager {
newestKafkaTimestamp: null,
file,
fileStream: createWriteStream(file, 'utf-8'),
offsets: {
lowest: Infinity,
highest: -Infinity,
},
offsets: {},
eventsRange: null,
}
@ -419,8 +417,8 @@ export class SessionManager {
const content = JSON.stringify(messageData) + '\n'
this.buffer.count += 1
this.buffer.sizeEstimate += content.length
this.buffer.offsets.lowest = Math.min(this.buffer.offsets.lowest, message.metadata.offset)
this.buffer.offsets.highest = Math.max(this.buffer.offsets.highest, message.metadata.offset)
this.buffer.offsets.lowest = minDefined(this.buffer.offsets.lowest, message.metadata.offset)
this.buffer.offsets.highest = maxDefined(this.buffer.offsets.highest, message.metadata.offset)
if (this.realtime) {
// We don't care about the response here as it is an optimistic call
@ -435,7 +433,7 @@ export class SessionManager {
}
private setEventsRangeFrom(message: IncomingRecordingMessage) {
const start = message.events.at(0)?.timestamp
const end = message.events.at(-1)?.timestamp
const end = message.events.at(-1)?.timestamp ?? start
if (!start || !end) {
captureMessage(
@ -451,8 +449,8 @@ export class SessionManager {
return
}
const firstTimestamp = Math.min(start, this.buffer.eventsRange?.firstTimestamp || Infinity)
const lastTimestamp = Math.max(end || start, this.buffer.eventsRange?.lastTimestamp || -Infinity)
const firstTimestamp = minDefined(start, this.buffer.eventsRange?.firstTimestamp) ?? start
const lastTimestamp = maxDefined(end, this.buffer.eventsRange?.lastTimestamp) ?? end
this.buffer.eventsRange = { firstTimestamp, lastTimestamp }
}
@ -504,10 +502,7 @@ export class SessionManager {
}
public getLowestOffset(): number | null {
if (this.buffer.count === 0) {
return null
}
return Math.min(this.buffer.offsets.lowest, this.flushBuffer?.offsets.lowest ?? Infinity)
return minDefined(this.buffer.offsets.lowest, this.flushBuffer?.offsets.lowest) ?? null
}
private async destroyBuffer(buffer: SessionBuffer): Promise<void> {

View File

@ -27,7 +27,7 @@ import { TeamManager } from '../../../worker/ingestion/team-manager'
import { parseEventTimestamp } from '../../../worker/ingestion/timestamps'
import { eventDroppedCounter } from '../metrics'
export const startSessionRecordingEventsConsumer = async ({
export const startSessionRecordingEventsConsumerV1 = async ({
teamManager,
kafkaConfig,
consumerMaxBytes,
@ -273,19 +273,23 @@ const eachMessage =
team.id,
messagePayload.distinct_id,
parseEventTimestamp(event as PluginEvent),
event.ip,
event.properties || {}
)
let replayRecord: null | SummarizedSessionRecordingEvent = null
try {
replayRecord = createSessionReplayEvent(
messagePayload.uuid,
team.id,
messagePayload.distinct_id,
event.ip,
event.properties || {}
)
const properties = event.properties || {}
const shouldCreateReplayEvents = (properties['$snapshot_consumer'] ?? 'v1') === 'v1'
if (shouldCreateReplayEvents && properties.$snapshot_data?.events_summary.length) {
replayRecord = createSessionReplayEvent(
messagePayload.uuid,
team.id,
messagePayload.distinct_id,
properties['$session_id'],
properties.$snapshot_data?.events_summary || []
)
}
// the replay record timestamp has to be valid and be within a reasonable diff from now
if (replayRecord !== null) {
const asDate = DateTime.fromSQL(replayRecord.first_timestamp)

View File

@ -1,22 +1,14 @@
import * as Sentry from '@sentry/node'
import { captureException } from '@sentry/node'
import { mkdirSync, rmSync } from 'node:fs'
import {
CODES,
features,
HighLevelProducer as RdKafkaProducer,
librdkafkaVersion,
Message,
TopicPartition,
} from 'node-rdkafka-acosom'
import path from 'path'
import { CODES, features, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka-acosom'
import { Pool } from 'pg'
import { Counter, Gauge, Histogram } from 'prom-client'
import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { status } from '../../../utils/status'
@ -24,11 +16,12 @@ import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-ma
import { ObjectStorage } from '../../services/object_storage'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { RealtimeManager } from './blob-ingester/realtime-manager'
import { SessionManager } from './blob-ingester/session-manager'
import { SessionOffsetHighWaterMark } from './blob-ingester/session-offset-high-water-mark'
import { IncomingRecordingMessage } from './blob-ingester/types'
import { now } from './blob-ingester/utils'
import { OffsetHighWaterMarker } from './services/offset-high-water-marker'
import { RealtimeManager } from './services/realtime-manager'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { SessionManager } from './services/session-manager'
import { IncomingRecordingMessage } from './types'
import { bufferFileDir, now } from './utils'
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
@ -37,8 +30,6 @@ const groupId = 'session-recordings-blob'
const sessionTimeout = 30000
const flushIntervalTimeoutMs = 30000
export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files')
const gaugeSessionsHandled = new Gauge({
name: 'recording_blob_ingestion_session_manager_count',
help: 'A gauge of the number of sessions being handled by this blob ingestion consumer',
@ -84,12 +75,12 @@ const counterKafkaMessageReceived = new Counter({
labelNames: ['partition'],
})
export class SessionRecordingBlobIngester {
export class SessionRecordingIngesterV2 {
sessions: Record<string, SessionManager> = {}
sessionOffsetHighWaterMark?: SessionOffsetHighWaterMark
offsetHighWaterMarker: OffsetHighWaterMarker
realtimeManager: RealtimeManager
replayEventsIngester: ReplayEventsIngester
batchConsumer?: BatchConsumer
producer?: RdKafkaProducer
flushInterval: NodeJS.Timer | null = null
// the time at the most recent message of a particular partition
partitionNow: Record<number, number | null> = {}
@ -104,12 +95,12 @@ export class SessionRecordingBlobIngester {
) {
this.realtimeManager = new RealtimeManager(this.redisPool, this.serverConfig)
if (this.serverConfig.SESSION_RECORDING_ENABLE_OFFSET_HIGH_WATER_MARK_PROCESSING) {
this.sessionOffsetHighWaterMark = new SessionOffsetHighWaterMark(
this.redisPool,
serverConfig.SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY
)
}
this.offsetHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
serverConfig.SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY
)
this.replayEventsIngester = new ReplayEventsIngester(this.serverConfig, this.offsetHighWaterMarker)
this.teamsRefresher = new BackgroundRefresher(async () => {
try {
@ -150,7 +141,7 @@ export class SessionRecordingBlobIngester {
op: 'checkHighWaterMark',
})
if (await this.sessionOffsetHighWaterMark?.isBelowHighWaterMark({ topic, partition }, session_id, offset)) {
if (await this.offsetHighWaterMarker.isBelowHighWaterMark({ topic, partition }, session_id, offset)) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
@ -180,7 +171,7 @@ export class SessionRecordingBlobIngester {
this.commitOffsets(topic, partition, session_id, offsets)
// We don't want to block if anything fails here. Watermarks are best effort
void this.sessionOffsetHighWaterMark?.add({ topic, partition }, session_id, offsets.slice(-1)[0])
void this.offsetHighWaterMarker.add({ topic, partition }, session_id, offsets.slice(-1)[0])
}
)
@ -199,7 +190,7 @@ export class SessionRecordingBlobIngester {
// If it is recoverable, we probably want to retry?
}
public async handleKafkaMessage(message: Message, span?: Sentry.Span): Promise<void> {
public async parseKafkaMessage(message: Message): Promise<IncomingRecordingMessage | void> {
const statusWarn = (reason: string, extra?: Record<string, any>) => {
status.warn('⚠️', 'invalid_message', {
reason,
@ -236,13 +227,9 @@ export class SessionRecordingBlobIngester {
let teamId: TeamId | null = null
const token = messagePayload.token
const teamSpan = span?.startChild({
op: 'fetchTeam',
})
if (token) {
teamId = await this.teamsRefresher.get().then((teams) => teams[token] || null)
}
teamSpan?.finish()
if (teamId == null) {
eventDroppedCounter
@ -267,17 +254,14 @@ export class SessionRecordingBlobIngester {
},
team_id: teamId,
distinct_id: event.distinct_id,
distinct_id: event.properties.distinct_id,
session_id: event.properties?.$session_id,
window_id: event.properties?.$window_id,
events: event.properties.$snapshot_items,
replayIngestionConsumer: event.properties?.$snapshot_consumer ?? 'v1',
}
const consumeSpan = span?.startChild({
op: 'consume',
})
await this.consume(recordingMessage, consumeSpan)
consumeSpan?.finish()
return recordingMessage
}
private async handleEachBatch(messages: Message[]): Promise<void> {
@ -285,15 +269,20 @@ export class SessionRecordingBlobIngester {
histogramKafkaBatchSize.observe(messages.length)
await Promise.all(
messages.map(async (message) => {
const childSpan = transaction.startChild({
op: 'handleKafkaMessage',
})
await this.handleKafkaMessage(message, childSpan)
childSpan.finish()
const recordingMessages: IncomingRecordingMessage[] = (
await Promise.all(messages.map((m) => this.parseKafkaMessage(m)))
).filter((message) => message) as IncomingRecordingMessage[]
for (const message of recordingMessages) {
const consumeSpan = transaction?.startChild({
op: 'blobConsume',
})
)
await this.consume(message, consumeSpan)
consumeSpan?.finish()
}
await this.replayEventsIngester.consumeBatch(recordingMessages)
transaction.finish()
}
@ -317,11 +306,14 @@ export class SessionRecordingBlobIngester {
// Load teams into memory
await this.teamsRefresher.refresh()
await this.replayEventsIngester.start()
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig)
// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
const recordingConsumerConfig = sessionRecordingConsumerConfig(this.serverConfig)
this.batchConsumer = await startBatchConsumer({
connectionConfig,
groupId,
@ -330,14 +322,14 @@ export class SessionRecordingBlobIngester {
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxBytes: recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
// our messages are very big, so we don't want to buffer too many
queuedMinMessages: this.serverConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.serverConfig.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: this.serverConfig.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: this.serverConfig.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
queuedMinMessages: recordingConsumerConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: recordingConsumerConfig.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: recordingConsumerConfig.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: recordingConsumerConfig.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
autoCommit: false,
eachBatch: async (messages) => {
return await this.handleEachBatch(messages)
@ -388,7 +380,7 @@ export class SessionRecordingBlobIngester {
gaugeLagMilliseconds.remove({ partition })
gaugeOffsetCommitted.remove({ partition })
gaugeOffsetCommitFailed.remove({ partition })
this.sessionOffsetHighWaterMark?.revoke(topicPartition)
this.offsetHighWaterMarker.revoke(topicPartition)
this.partitionNow[partition] = null
this.partitionLastKnownCommit[partition] = null
})
@ -402,14 +394,8 @@ export class SessionRecordingBlobIngester {
})
// Make sure to disconnect the producer after we've finished consuming.
this.batchConsumer.join().finally(async () => {
if (this.producer && this.producer.isConnected()) {
status.debug(
'🔁',
'blob_ingester_consumer disconnecting kafka producer in session recordings batchConsumer finally'
)
await disconnectProducer(this.producer)
}
this.batchConsumer.join().finally(() => {
status.debug('🔁', 'blob_ingester_consumer - batch consumer has finished')
})
this.batchConsumer.consumer.on('disconnected', async (err) => {
@ -469,11 +455,7 @@ export class SessionRecordingBlobIngester {
}
await this.realtimeManager.unsubscribe()
if (this.producer && this.producer.isConnected()) {
status.info('🔁', 'blob_ingester_consumer disconnecting kafka producer in batchConsumer stop')
await disconnectProducer(this.producer)
}
await this.replayEventsIngester.stop()
await this.batchConsumer?.stop()
// This is inefficient but currently necessary due to new instances restarting from the committed offset point
@ -540,7 +522,7 @@ export class SessionRecordingBlobIngester {
offsetToCommit: highestOffsetToCommit,
})
void this.sessionOffsetHighWaterMark?.onCommit({ topic, partition }, highestOffsetToCommit)
void this.offsetHighWaterMarker.clear({ topic, partition }, highestOffsetToCommit)
try {
this.batchConsumer?.consumer.commit({

View File

@ -6,38 +6,32 @@
* Any changes may need to be sync'd between the two
*/
import { RRWebEvent } from '../../../types'
const activeSources = [1, 2, 3, 4, 5, 6, 7, 12]
const ACTIVITY_THRESHOLD_MS = 5000
export interface RRWebEventSummaryData {
export interface RRWebPartialData {
href?: string
source?: number
payload?: Record<string, any>
plugin?: string
}
export interface RRWebEventSummary {
timestamp: number
type: number
data: RRWebEventSummaryData
windowId: string
}
interface RecordingSegment {
kind: 'window' | 'buffer' | 'gap'
startTimestamp: number // Epoch time that the segment starts
endTimestamp: number // Epoch time that the segment ends
durationMs: number
windowId?: string
isActive: boolean
}
const isActiveEvent = (event: RRWebEventSummary): boolean => {
const isActiveEvent = (event: RRWebEvent): boolean => {
return event.type === 3 && activeSources.includes(event.data?.source || -1)
}
const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => {
const createSegments = (snapshots: RRWebEvent[]): RecordingSegment[] => {
let segments: RecordingSegment[] = []
let activeSegment!: Partial<RecordingSegment>
let lastActiveEventTimestamp = 0
@ -60,11 +54,6 @@ const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => {
isNewSegment = true
}
// 4. If windowId changes we create a new segment
if (activeSegment?.windowId !== snapshot.windowId) {
isNewSegment = true
}
if (isNewSegment) {
if (activeSegment) {
segments.push(activeSegment as RecordingSegment)
@ -73,7 +62,6 @@ const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => {
activeSegment = {
kind: 'window',
startTimestamp: snapshot.timestamp,
windowId: snapshot.windowId,
isActive: eventIsActive,
}
}
@ -98,7 +86,7 @@ const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => {
* TODO add code sharing between plugin-server and front-end so that this method can
* call the same createSegments function as the front-end
*/
export const activeMilliseconds = (snapshots: RRWebEventSummary[]): number => {
export const activeMilliseconds = (snapshots: RRWebEvent[]): number => {
const segments = createSegments(snapshots)
return segments.reduce((acc, segment) => {
if (segment.isActive) {

View File

@ -1,16 +1,11 @@
// This is the incoming message from Kafka
export type RRWebEvent = Record<string, any> & {
timestamp: number
type: number
data: any
}
import { TopicPartitionOffset } from 'node-rdkafka-acosom'
import { RRWebEvent } from '../../../types'
export type IncomingRecordingMessage = {
metadata: {
topic: string
partition: number
offset: number
metadata: TopicPartitionOffset & {
timestamp: number
}
@ -19,6 +14,8 @@ export type IncomingRecordingMessage = {
session_id: string
window_id?: string
events: RRWebEvent[]
// NOTE: This is only for migrating from one consumer to the other
replayIngestionConsumer: 'v1' | 'v2'
}
// This is the incoming message from Kafka

View File

@ -0,0 +1,26 @@
import { DateTime } from 'luxon'
import path from 'path'
import { IncomingRecordingMessage, PersistedRecordingMessage } from './types'
export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => {
return {
window_id: message.window_id,
data: message.events,
}
}
// Helper to return now as a milliseconds timestamp
export const now = () => DateTime.now().toMillis()
export const minDefined = (...args: (number | undefined)[]): number | undefined => {
const definedArgs = args.filter((arg) => arg !== undefined) as number[]
return Math.min(...definedArgs)
}
export const maxDefined = (...args: (number | undefined)[]): number | undefined => {
const definedArgs = args.filter((arg) => arg !== undefined) as number[]
return Math.max(...definedArgs)
}
export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files')

View File

@ -7,7 +7,7 @@ import * as schedule from 'node-schedule'
import { Counter } from 'prom-client'
import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig, sessionRecordingBlobConsumerConfig } from '../config/config'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createStatsdClient } from '../utils/db/hub'
import { captureEventLoopMetrics } from '../utils/metrics'
@ -31,8 +31,8 @@ import {
startAsyncWebhooksHandlerConsumer,
} from './ingestion-queues/on-event-handler-consumer'
import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer'
import { SessionRecordingBlobIngester } from './ingestion-queues/session-recording/session-recordings-blob-consumer'
import { startSessionRecordingEventsConsumer } from './ingestion-queues/session-recording/session-recordings-consumer'
import { startSessionRecordingEventsConsumerV1 } from './ingestion-queues/session-recording/session-recordings-consumer-v1'
import { SessionRecordingIngesterV2 } from './ingestion-queues/session-recording/session-recordings-consumer-v2'
import { createHttpServer } from './services/http-server'
import { getObjectStorage } from './services/object_storage'
@ -393,7 +393,7 @@ export async function startPluginsServer(
stop,
isHealthy: isSessionRecordingsHealthy,
join,
} = await startSessionRecordingEventsConsumer({
} = await startSessionRecordingEventsConsumerV1({
teamManager: teamManager,
kafkaConfig: serverConfig,
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
@ -408,17 +408,20 @@ export async function startPluginsServer(
}
if (capabilities.sessionRecordingBlobIngestion) {
const blobServerConfig = sessionRecordingBlobConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? createPostgresPool(blobServerConfig.DATABASE_URL)
const s3 = hub?.objectStorage ?? getObjectStorage(blobServerConfig)
const redisPool = hub?.db.redisPool ?? createRedisPool(blobServerConfig)
const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? createPostgresPool(recordingConsumerConfig.DATABASE_URL)
const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig)
const redisPool = hub?.db.redisPool ?? createRedisPool(recordingConsumerConfig)
if (!s3) {
throw new Error("Can't start session recording blob ingestion without object storage")
}
const ingester = new SessionRecordingBlobIngester(blobServerConfig, postgres, s3, redisPool)
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngesterV2(serverConfig, postgres, s3, redisPool)
await ingester.start()
const batchConsumer = ingester.batchConsumer
if (batchConsumer) {
stopSessionRecordingBlobConsumer = async () => {
// Tricky - in some cases the hub is responsible, in which case it will drain and clear. Otherwise we are responsible.

View File

@ -197,7 +197,6 @@ export interface PluginsServerConfig {
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number
CLOUD_DEPLOYMENT: string
SESSION_RECORDING_ENABLE_OFFSET_HIGH_WATER_MARK_PROCESSING: boolean
// local directory might be a volume mount or a directory on disk (e.g. in local dev)
SESSION_RECORDING_LOCAL_DIRECTORY: string
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
@ -1130,3 +1129,9 @@ export interface PipelineEvent extends Omit<PluginEvent, 'team_id'> {
}
export type RedisPool = GenericPool<Redis>
export type RRWebEvent = Record<string, any> & {
timestamp: number
type: number
data: any
}

View File

@ -3,7 +3,7 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { DateTime } from 'luxon'
import { activeMilliseconds, RRWebEventSummary } from '../../main/ingestion-queues/session-recording/snapshot-segmenter'
import { activeMilliseconds } from '../../main/ingestion-queues/session-recording/snapshot-segmenter'
import {
Element,
GroupTypeIndex,
@ -15,6 +15,7 @@ import {
RawClickHouseEvent,
RawPerformanceEvent,
RawSessionRecordingEvent,
RRWebEvent,
Team,
TimestampFormat,
} from '../../types'
@ -241,7 +242,6 @@ export const createSessionRecordingEvent = (
team_id: number,
distinct_id: string,
timestamp: DateTime,
ip: string | null,
properties: Properties
) => {
const timestampString = castTimestampOrNow(timestamp, TimestampFormat.ClickHouse)
@ -282,33 +282,19 @@ export const createSessionReplayEvent = (
uuid: string,
team_id: number,
distinct_id: string,
ip: string | null,
properties: Properties
session_id: string,
events: RRWebEvent[]
) => {
const chunkIndex = properties['$snapshot_data']?.chunk_index
// only the first chunk has the eventsSummary
// we can ignore subsequent chunks for calculating a replay event
if (chunkIndex > 0) {
return null
}
const eventsSummaries: RRWebEventSummary[] = properties['$snapshot_data']?.['events_summary'] || []
const timestamps = eventsSummaries
.filter((eventSummary: RRWebEventSummary) => {
return !!eventSummary?.timestamp
})
.map((eventSummary: RRWebEventSummary) => {
return castTimestampOrNow(DateTime.fromMillis(eventSummary.timestamp), TimestampFormat.ClickHouse)
})
const timestamps = events
.filter((e) => !!e?.timestamp)
.map((e) => castTimestampOrNow(DateTime.fromMillis(e.timestamp), TimestampFormat.ClickHouse))
.sort()
// but every event where chunk index = 0 must have an eventsSummary
if (eventsSummaries.length === 0 || timestamps.length === 0) {
if (events.length === 0 || timestamps.length === 0) {
status.warn('🙈', 'ignoring an empty session recording event', {
session_id: properties['$session_id'],
properties: properties,
session_id,
events,
})
// it is safe to throw here as it caught a level up so that we can see this happening in Sentry
throw new Error('ignoring an empty session recording event')
@ -321,21 +307,21 @@ export const createSessionReplayEvent = (
let consoleWarnCount = 0
let consoleErrorCount = 0
let url: string | undefined = undefined
eventsSummaries.forEach((eventSummary: RRWebEventSummary) => {
if (eventSummary.type === 3) {
events.forEach((event) => {
if (event.type === 3) {
mouseActivity += 1
if (eventSummary.data?.source === 2) {
if (event.data?.source === 2) {
clickCount += 1
}
if (eventSummary.data?.source === 5) {
if (event.data?.source === 5) {
keypressCount += 1
}
}
if (!!eventSummary.data?.href?.trim().length && url === undefined) {
url = eventSummary.data.href
if (!!event.data?.href?.trim().length && url === undefined) {
url = event.data.href
}
if (eventSummary.type === 6 && eventSummary.data?.plugin === 'rrweb/console@1') {
const level = eventSummary.data.payload?.level
if (event.type === 6 && event.data?.plugin === 'rrweb/console@1') {
const level = event.data.payload?.level
if (level === 'log') {
consoleLogCount += 1
} else if (level === 'warn') {
@ -346,13 +332,13 @@ export const createSessionReplayEvent = (
}
})
const activeTime = activeMilliseconds(eventsSummaries)
const activeTime = activeMilliseconds(events)
const data: SummarizedSessionRecordingEvent = {
uuid,
team_id: team_id,
distinct_id: distinct_id,
session_id: properties['$session_id'],
session_id: session_id,
first_timestamp: timestamps[0],
last_timestamp: timestamps[timestamps.length - 1],
click_count: clickCount,
@ -363,7 +349,7 @@ export const createSessionReplayEvent = (
console_log_count: consoleLogCount,
console_warn_count: consoleWarnCount,
console_error_count: consoleErrorCount,
size: Buffer.byteLength(JSON.stringify(properties), 'utf8'),
size: Buffer.byteLength(JSON.stringify(events), 'utf8'),
}
return data

View File

@ -1,4 +1,4 @@
import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/types'
import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types'
import jsonFullSnapshot from './data/snapshot-full.json'
export function createIncomingRecordingMessage(
@ -16,6 +16,7 @@ export function createIncomingRecordingMessage(
session_id: 'session_id_1',
window_id: 'window_id_1',
events: [{ ...jsonFullSnapshot }],
replayIngestionConsumer: 'v2',
...partialIncomingMessage,
metadata: {

View File

@ -1,10 +1,10 @@
import { TopicPartition } from 'kafkajs'
import {
OffsetHighWaterMarker,
offsetHighWaterMarkKey,
SessionOffsetHighWaterMark,
SessionOffsetHighWaterMarks,
} from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/session-offset-high-water-mark'
OffsetHighWaterMarks,
} from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker'
import { Hub } from '../../../../../src/types'
import { createHub } from '../../../../../src/utils/db/hub'
@ -13,7 +13,7 @@ describe('session offset high-water mark', () => {
let hub: Hub
let closeHub: () => Promise<void>
const keyPrefix = 'test-high-water-mark'
let sessionOffsetHighWaterMark: SessionOffsetHighWaterMark
let offsetHighWaterMarker: OffsetHighWaterMarker
async function deletePrefixedKeys() {
const redisClient = await hub.redisPool.acquire()
@ -32,7 +32,7 @@ describe('session offset high-water mark', () => {
const redisValue = await client.zrange(key, 0, -1, 'WITHSCORES')
await hub.redisPool.release(client)
return redisValue.reduce((acc: SessionOffsetHighWaterMarks, value: string, index: number) => {
return redisValue.reduce((acc: OffsetHighWaterMarks, value: string, index: number) => {
if (index % 2 === 0) {
acc[value] = parseInt(redisValue[index + 1])
}
@ -42,7 +42,7 @@ describe('session offset high-water mark', () => {
beforeEach(async () => {
;[hub, closeHub] = await createHub()
sessionOffsetHighWaterMark = new SessionOffsetHighWaterMark(hub.redisPool, keyPrefix)
offsetHighWaterMarker = new OffsetHighWaterMarker(hub.redisPool, keyPrefix)
})
afterEach(async () => {
@ -51,18 +51,18 @@ describe('session offset high-water mark', () => {
})
const expectMemoryAndRedisToEqual = async (tp: TopicPartition, toEqual: any) => {
expect(await sessionOffsetHighWaterMark.getWaterMarks(tp)).toEqual(toEqual)
expect(await offsetHighWaterMarker.getWaterMarks(tp)).toEqual(toEqual)
expect(await getWaterMarksFromRedis(tp)).toEqual(toEqual)
}
describe('with no existing high-water marks', () => {
it('can remove all high-water marks based on a given offset', async () => {
await sessionOffsetHighWaterMark.onCommit({ topic: 'topic', partition: 1 }, 12)
await offsetHighWaterMarker.clear({ topic: 'topic', partition: 1 }, 12)
await expectMemoryAndRedisToEqual({ topic: 'topic', partition: 1 }, {})
})
it('can add a high-water mark', async () => {
await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session', 123)
await offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session', 123)
await expectMemoryAndRedisToEqual(
{ topic: 'topic', partition: 1 },
{
@ -73,8 +73,8 @@ describe('session offset high-water mark', () => {
it('can get multiple watermarks without clashes', async () => {
const results = await Promise.all([
sessionOffsetHighWaterMark.getWaterMarks({ topic: 'topic', partition: 1 }),
sessionOffsetHighWaterMark.getWaterMarks({ topic: 'topic', partition: 2 }),
offsetHighWaterMarker.getWaterMarks({ topic: 'topic', partition: 1 }),
offsetHighWaterMarker.getWaterMarks({ topic: 'topic', partition: 2 }),
])
expect(results).toEqual([{}, {}])
@ -82,9 +82,9 @@ describe('session offset high-water mark', () => {
it('can add multiple high-water marks in parallel', async () => {
await Promise.all([
sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session', 10),
sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session2', 20),
sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 2 }, 'some-session3', 30),
offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session', 10),
offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session2', 20),
offsetHighWaterMarker.add({ topic: 'topic', partition: 2 }, 'some-session3', 30),
])
await expectMemoryAndRedisToEqual(
@ -107,11 +107,11 @@ describe('session offset high-water mark', () => {
describe('with existing high-water marks', () => {
beforeEach(async () => {
// works even before anything is written to redis
expect(await sessionOffsetHighWaterMark.getWaterMarks({ topic: 'topic', partition: 1 })).toStrictEqual({})
expect(await offsetHighWaterMarker.getWaterMarks({ topic: 'topic', partition: 1 })).toStrictEqual({})
await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'some-session', 123)
await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 1 }, 'another-session', 12)
await sessionOffsetHighWaterMark.add({ topic: 'topic', partition: 2 }, 'a-third-session', 1)
await offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'some-session', 123)
await offsetHighWaterMarker.add({ topic: 'topic', partition: 1 }, 'another-session', 12)
await offsetHighWaterMarker.add({ topic: 'topic', partition: 2 }, 'a-third-session', 1)
})
it('can get high-water marks for all sessions for a partition', async () => {
@ -125,7 +125,7 @@ describe('session offset high-water mark', () => {
})
it('can remove all high-water marks based on a given offset', async () => {
await sessionOffsetHighWaterMark.onCommit({ topic: 'topic', partition: 1 }, 12)
await offsetHighWaterMarker.clear({ topic: 'topic', partition: 1 }, 12)
// the commit updates redis
// removes all high-water marks that are <= 12
@ -156,7 +156,7 @@ describe('session offset high-water mark', () => {
await Promise.allSettled(
partitionOneTestCases.map(async ([offset, expected]) => {
expect(
await sessionOffsetHighWaterMark.isBelowHighWaterMark(
await offsetHighWaterMarker.isBelowHighWaterMark(
{ topic: 'topic', partition: 1 },
'some-session',
offset
@ -169,7 +169,7 @@ describe('session offset high-water mark', () => {
it('can check if an offset is below the high-water mark even if we have never seen it before', async () => {
// there is nothing for a partition? we are always below the high-water mark
expect(
await sessionOffsetHighWaterMark.isBelowHighWaterMark(
await offsetHighWaterMarker.isBelowHighWaterMark(
{ topic: 'topic', partition: 1 },
'anything we did not add yet',
5432

View File

@ -3,8 +3,8 @@ import { createReadStream, createWriteStream } from 'fs'
import { DateTime, Settings } from 'luxon'
import { defaultConfig } from '../../../../../src/config/config'
import { SessionManager } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/session-manager'
import { now } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils'
import { SessionManager } from '../../../../../src/main/ingestion-queues/session-recording/services/session-manager'
import { now } from '../../../../../src/main/ingestion-queues/session-recording/utils'
import { createIncomingRecordingMessage } from '../fixtures'
const createMockStream = () => {

View File

@ -3,8 +3,8 @@ import LibrdKafkaError from 'node-rdkafka-acosom/lib/error'
import { Pool } from 'pg'
import { defaultConfig } from '../../../../src/config/config'
import { now } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils'
import { eachBatch } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer'
import { eachBatch } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v1'
import { now } from '../../../../src/main/ingestion-queues/session-recording/utils'
import { TeamManager } from '../../../../src/worker/ingestion/team-manager'
import { createOrganization, createTeam } from '../../../helpers/sql'
@ -104,6 +104,32 @@ describe('session-recordings-consumer', () => {
expect(producer.produce).toHaveBeenCalledTimes(2)
})
test('eachBatch does not emit replay event if set to other consumer', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)
const eachBachWithDependencies: any = eachBatch({ producer, teamManager })
await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({
team_id: teamId,
data: JSON.stringify({
event: '$snapshot',
properties: {
$snapshot_data: { events_summary: [{ timestamp: now() }] },
$snapshot_consumer: 'v2',
},
}),
}),
timestamp: 123,
},
])
expect(producer.produce).toHaveBeenCalledTimes(1)
})
test('eachBatch does not emit a replay record that is more than a month in the future', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)

View File

@ -3,13 +3,13 @@ import path from 'path'
import { waitForExpect } from '../../../../functional_tests/expectations'
import { defaultConfig } from '../../../../src/config/config'
import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer'
import { SessionRecordingIngesterV2 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v2'
import { Hub, PluginsServerConfig } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { UUIDT } from '../../../../src/utils/utils'
import { createIncomingRecordingMessage } from './fixtures'
function assertIngesterHasExpectedPartitions(ingester: SessionRecordingBlobIngester, expectedPartitions: number[]) {
function assertIngesterHasExpectedPartitions(ingester: SessionRecordingIngesterV2, expectedPartitions: number[]) {
const partitions: Set<number> = new Set()
Object.values(ingester.sessions).forEach((session) => {
partitions.add(session.partition)
@ -23,8 +23,8 @@ describe('ingester rebalancing tests', () => {
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/test-session-recordings',
}
let ingesterOne: SessionRecordingBlobIngester
let ingesterTwo: SessionRecordingBlobIngester
let ingesterOne: SessionRecordingIngesterV2
let ingesterTwo: SessionRecordingIngesterV2
let hub: Hub
let closeHub: () => Promise<void>
@ -48,7 +48,7 @@ describe('ingester rebalancing tests', () => {
})
it('rebalances partitions safely from one to two consumers', async () => {
ingesterOne = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool)
ingesterOne = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool)
await ingesterOne.start()
@ -59,7 +59,7 @@ describe('ingester rebalancing tests', () => {
assertIngesterHasExpectedPartitions(ingesterOne, [1])
})
ingesterTwo = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool)
ingesterTwo = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool)
await ingesterTwo.start()
await waitForExpect(() => {

View File

@ -3,7 +3,7 @@ import path from 'path'
import { waitForExpect } from '../../../../functional_tests/expectations'
import { defaultConfig } from '../../../../src/config/config'
import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer'
import { SessionRecordingIngesterV2 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v2'
import { Hub, PluginsServerConfig } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { createIncomingRecordingMessage } from './fixtures'
@ -50,7 +50,7 @@ describe('ingester', () => {
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/test-session-recordings',
}
let ingester: SessionRecordingBlobIngester
let ingester: SessionRecordingIngesterV2
let hub: Hub
let closeHub: () => Promise<void>
@ -84,7 +84,7 @@ describe('ingester', () => {
// these tests assume that a flush won't run while they run
beforeEach(async () => {
ingester = new SessionRecordingBlobIngester(
ingester = new SessionRecordingIngesterV2(
{
...defaultConfig,
SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY: keyPrefix,

View File

@ -11,7 +11,6 @@ import * as IORedis from 'ioredis'
import { DateTime } from 'luxon'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/config/kafka-topics'
import { RRWebEventSummary } from '../../src/main/ingestion-queues/session-recording/snapshot-segmenter'
import {
ClickHouseEvent,
Database,
@ -20,6 +19,7 @@ import {
Person,
PluginsServerConfig,
PropertyDefinitionTypeEnum,
RRWebEvent,
Team,
} from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
@ -1191,14 +1191,10 @@ test('capture first team event', async () => {
})
test('snapshot event stored as session_recording_event', () => {
const data = createSessionRecordingEvent(
'some-id',
team.id,
'5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4',
now,
'',
{ $session_id: 'abcf-efg', $snapshot_data: { timestamp: 123 } } as any as Properties
)
const data = createSessionRecordingEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', now, {
$session_id: 'abcf-efg',
$snapshot_data: { timestamp: 123 },
} as any as Properties)
expect(data).toEqual({
created_at: expect.stringMatching(/^\d{4}-\d{2}-\d{2} [\d\s:]+/),
@ -1212,7 +1208,7 @@ test('snapshot event stored as session_recording_event', () => {
})
})
const sessionReplayEventTestCases: {
snapshotData: { events_summary: RRWebEventSummary[] }
snapshotData: { events_summary: RRWebEvent[] }
expected: Pick<
SummarizedSessionRecordingEvent,
| 'click_count'
@ -1241,7 +1237,7 @@ const sessionReplayEventTestCases: {
console_log_count: 0,
console_warn_count: 0,
console_error_count: 0,
size: 136,
size: 73,
},
},
{
@ -1257,7 +1253,7 @@ const sessionReplayEventTestCases: {
console_log_count: 0,
console_warn_count: 0,
console_error_count: 0,
size: 136,
size: 73,
},
},
{
@ -1313,7 +1309,7 @@ const sessionReplayEventTestCases: {
console_log_count: 2,
console_warn_count: 3,
console_error_count: 1,
size: 825,
size: 762,
},
},
{
@ -1351,7 +1347,7 @@ const sessionReplayEventTestCases: {
console_log_count: 0,
console_warn_count: 0,
console_error_count: 0,
size: 276,
size: 213,
},
},
{
@ -1378,16 +1374,19 @@ const sessionReplayEventTestCases: {
console_log_count: 0,
console_warn_count: 0,
console_error_count: 0,
size: 496,
size: 433,
},
},
]
sessionReplayEventTestCases.forEach(({ snapshotData, expected }) => {
test(`snapshot event ${JSON.stringify(snapshotData)} can be stored as session_replay_event`, () => {
const data = createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
$session_id: 'abcf-efg',
$snapshot_data: snapshotData,
} as any as Properties)
const data = createSessionReplayEvent(
'some-id',
team.id,
'5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4',
'abcf-efg',
snapshotData.events_summary
)
const expectedEvent: SummarizedSessionRecordingEvent = {
distinct_id: '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4',
@ -1402,37 +1401,29 @@ sessionReplayEventTestCases.forEach(({ snapshotData, expected }) => {
test(`snapshot event with no event summary is ignored`, () => {
expect(() => {
createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
$session_id: 'abcf-efg',
$snapshot_data: {},
} as any as Properties)
createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', 'abcf-efg', [])
}).toThrowError()
})
test(`snapshot event with no event summary timestamps is ignored`, () => {
expect(() => {
createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', '', {
$session_id: 'abcf-efg',
$snapshot_data: {
events_summary: [
{
type: 5,
data: {
payload: {
// doesn't match because href is nested in payload
href: 'http://127.0.0.1:8000/home',
},
},
createSessionReplayEvent('some-id', team.id, '5AzhubH8uMghFHxXq0phfs14JOjH6SA2Ftr1dzXj7U4', 'abcf-efg', [
{
type: 5,
data: {
payload: {
// doesn't match because href is nested in payload
href: 'http://127.0.0.1:8000/home',
},
{
type: 4,
data: {
href: 'http://127.0.0.1:8000/second/url',
},
},
],
},
},
} as any as Properties)
{
type: 4,
data: {
href: 'http://127.0.0.1:8000/second/url',
},
},
] as any[])
}).toThrowError()
})

View File

@ -164,6 +164,7 @@ def log_event(data: Dict, event_name: str, partition_key: Optional[str]):
producer = sessionRecordingKafkaProducer()
else:
producer = KafkaProducer()
future = producer.produce(topic=kafka_topic, data=data, key=partition_key)
statsd.incr("posthog_cloud_plugin_server_ingestion")
return future
@ -365,6 +366,8 @@ def get_event(request):
# NOTE: Whilst we are testing this code we want to track exceptions but allow the events through if anything goes wrong
capture_exception(e)
consumer_destination = "v2" if random() <= settings.REPLAY_EVENTS_NEW_CONSUMER_RATIO else "v1"
try:
replay_events, other_events = split_replay_events(events)
processed_replay_events = replay_events
@ -373,6 +376,10 @@ def get_event(request):
# Legacy solution stays in place
processed_replay_events = legacy_preprocess_session_recording_events_for_clickhouse(replay_events)
# Mark all events so that they are only consumed by one consumer
for event in processed_replay_events:
event["properties"]["$snapshot_consumer"] = consumer_destination
events = processed_replay_events + other_events
except ValueError as e:
@ -445,12 +452,16 @@ def get_event(request):
)
try:
if replay_events and random() <= settings.REPLAY_BLOB_INGESTION_TRAFFIC_RATIO:
if replay_events:
# The new flow we only enable if the dedicated kafka is enabled
alternative_replay_events = preprocess_replay_events_for_blob_ingestion(
replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES
)
# Mark all events so that they are only consumed by one consumer
for event in alternative_replay_events:
event["properties"]["$snapshot_consumer"] = consumer_destination
futures = []
# We want to be super careful with our new ingestion flow for now so the whole thing is separated

View File

@ -1,6 +1,6 @@
# name: TestCohort.test_async_deletion_of_cohort
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
@ -10,7 +10,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.1
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
@ -114,7 +114,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.2
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
@ -124,7 +124,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.3
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:102 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
@ -134,7 +134,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.4
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
@ -144,7 +144,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.5
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
@ -178,7 +178,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.6
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
@ -188,7 +188,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.7
'
/* user_id:103 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:102 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2

View File

@ -1186,7 +1186,7 @@ class TestCapture(BaseTest):
def test_legacy_recording_ingestion_data_sent_to_kafka(self, kafka_produce) -> None:
session_id = "some_session_id"
self._send_session_recording_event(session_id=session_id)
self.assertEqual(kafka_produce.call_count, 1)
self.assertEqual(kafka_produce.call_count, 2)
kafka_topic_used = kafka_produce.call_args_list[0][1]["topic"]
self.assertEqual(kafka_topic_used, KAFKA_SESSION_RECORDING_EVENTS)
key = kafka_produce.call_args_list[0][1]["key"]
@ -1204,17 +1204,16 @@ class TestCapture(BaseTest):
snapshot_source = 8
snapshot_type = 8
event_data = {"foo": "bar"}
with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0):
self._send_session_recording_event(
timestamp=timestamp,
snapshot_source=snapshot_source,
snapshot_type=snapshot_type,
session_id=session_id,
distinct_id=distinct_id,
window_id=window_id,
event_data=event_data,
)
self.assertEqual(kafka_produce.call_count, 1)
self._send_session_recording_event(
timestamp=timestamp,
snapshot_source=snapshot_source,
snapshot_type=snapshot_type,
session_id=session_id,
distinct_id=distinct_id,
window_id=window_id,
event_data=event_data,
)
self.assertEqual(kafka_produce.call_count, 2)
self.assertEqual(kafka_produce.call_args_list[0][1]["topic"], KAFKA_SESSION_RECORDING_EVENTS)
key = kafka_produce.call_args_list[0][1]["key"]
self.assertEqual(key, session_id)
@ -1225,6 +1224,7 @@ class TestCapture(BaseTest):
{
"event": "$snapshot",
"properties": {
"$snapshot_consumer": "v1",
"$snapshot_data": {
"chunk_count": 1,
"chunk_id": "fake-uuid",
@ -1253,12 +1253,13 @@ class TestCapture(BaseTest):
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
assert topic_counter == Counter({KAFKA_SESSION_RECORDING_EVENTS: 3})
assert topic_counter == Counter(
{KAFKA_SESSION_RECORDING_EVENTS: 3, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}
)
@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_write_to_blob_ingestion_topic_with_usual_size_limit(self, kafka_produce) -> None:
with self.settings(
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=512,
):
self._send_session_recording_event(event_data=large_data_array)
@ -1272,7 +1273,6 @@ class TestCapture(BaseTest):
@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produce) -> None:
with self.settings(
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480,
):
self._send_session_recording_event(event_data=large_data_array)
@ -1292,7 +1292,6 @@ class TestCapture(BaseTest):
KAFKA_SECURITY_PROTOCOL="SASL_SSL",
SESSION_RECORDING_KAFKA_HOSTS=["another-server:9092", "a-fourth.server:9092"],
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL="SSL",
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=1234,
):
# avoid logs from being printed because the mock is None
@ -1323,7 +1322,6 @@ class TestCapture(BaseTest):
with self.settings(
KAFKA_HOSTS=["first.server:9092", "second.server:9092"],
SESSION_RECORDING_KAFKA_HOSTS=["another-server:9092", "a-fourth.server:9092"],
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=1,
):
default_kafka_producer_mock.return_value = KafkaProducer()
session_recording_producer_factory_mock.return_value = sessionRecordingKafkaProducer()
@ -1341,27 +1339,6 @@ class TestCapture(BaseTest):
assert data_sent_to_recording_kafka["event"] == "$snapshot_items"
assert len(data_sent_to_recording_kafka["properties"]["$snapshot_items"]) == 1
@patch("posthog.api.capture.sessionRecordingKafkaProducer")
@patch("posthog.api.capture.KafkaProducer")
@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_uses_does_not_produce_if_blob_ingestion_disabled(
self,
kafka_produce: MagicMock,
default_kafka_producer_mock: MagicMock,
session_recording_producer_mock: MagicMock,
) -> None:
with self.settings(REPLAY_BLOB_INGESTION_TRAFFIC_RATIO=0):
default_kafka_producer_mock.return_value = KafkaProducer()
session_recording_producer_mock.side_effect = sessionRecordingKafkaProducer()
data = "example"
self._send_session_recording_event(event_data=data)
default_kafka_producer_mock.assert_called()
session_recording_producer_mock.assert_not_called()
assert len(kafka_produce.call_args_list) == 1
assert json.loads(kafka_produce.call_args_list[0][1]["data"]["data"])["event"] == "$snapshot"
def test_get_distinct_id_non_json_properties(self) -> None:
with self.assertRaises(ValueError):
get_distinct_id({"properties": "str"})
@ -1415,7 +1392,7 @@ class TestCapture(BaseTest):
replace_limited_team_tokens(QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000})
replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000})
self._send_session_recording_event()
self.assertEqual(kafka_produce.call_count, 1)
self.assertEqual(kafka_produce.call_count, 2)
@patch("posthog.kafka_client.client._KafkaProducer.produce")
@pytest.mark.ee
@ -1440,11 +1417,11 @@ class TestCapture(BaseTest):
with self.settings(QUOTA_LIMITING_ENABLED=True):
_produce_events()
self.assertEqual(kafka_produce.call_count, 3)
self.assertEqual(kafka_produce.call_count, 4)
replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000})
_produce_events()
self.assertEqual(kafka_produce.call_count, 1) # Only the recording event
self.assertEqual(kafka_produce.call_count, 2) # Only the recording event
replace_limited_team_tokens(
QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000}
@ -1457,7 +1434,7 @@ class TestCapture(BaseTest):
)
replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() - 10000})
_produce_events()
self.assertEqual(kafka_produce.call_count, 3) # All events as limit-until timestamp is in the past
self.assertEqual(kafka_produce.call_count, 4) # All events as limit-until timestamp is in the past
@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_capture_historical_analytics_events(self, kafka_produce) -> None:

View File

@ -31,10 +31,10 @@ PARTITION_KEY_BUCKET_REPLENTISH_RATE = get_from_env(
)
REPLAY_EVENT_MAX_SIZE = get_from_env("REPLAY_EVENT_MAX_SIZE", type_cast=int, default=1024 * 512) # 512kb
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = get_from_env("REPLAY_BLOB_INGESTION_TRAFFIC_RATIO", type_cast=float, default=0.0)
REPLAY_EVENTS_NEW_CONSUMER_RATIO = get_from_env("REPLAY_EVENTS_NEW_CONSUMER_RATIO", type_cast=float, default=0.0)
if REPLAY_BLOB_INGESTION_TRAFFIC_RATIO > 1 or REPLAY_BLOB_INGESTION_TRAFFIC_RATIO < 0:
if REPLAY_EVENTS_NEW_CONSUMER_RATIO > 1 or REPLAY_EVENTS_NEW_CONSUMER_RATIO < 0:
logger.critical(
"Environment variable REPLAY_BLOB_INGESTION_TRAFFIC_RATIO is not between 0 and 1. Setting to 0 to be safe."
"Environment variable REPLAY_EVENTS_NEW_CONSUMER_RATIO is not between 0 and 1. Setting to 0 to be safe."
)
REPLAY_BLOB_INGESTION_TRAFFIC_RATIO = 0
REPLAY_EVENTS_NEW_CONSUMER_RATIO = 0