mirror of
https://github.com/PostHog/posthog.git
synced 2024-12-01 04:12:23 +01:00
feat: key blobs by event times (#15640)
* feat: key blobs by event times * remove TODO * softly softlly * Update plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts Co-authored-by: Ben White <ben@posthog.com> --------- Co-authored-by: Ben White <ben@posthog.com>
This commit is contained in:
parent
4bd9d844c9
commit
e6551c31c2
@ -61,6 +61,11 @@ export const gaugePendingChunksBlocking = new Gauge({
|
||||
|
||||
const ESTIMATED_GZIP_COMPRESSION_RATIO = 0.1
|
||||
|
||||
interface EventsRange {
|
||||
firstTimestamp: number
|
||||
lastTimestamp: number
|
||||
}
|
||||
|
||||
// The buffer is a list of messages grouped
|
||||
type SessionBuffer = {
|
||||
id: string
|
||||
@ -69,6 +74,7 @@ type SessionBuffer = {
|
||||
size: number
|
||||
file: string
|
||||
offsets: number[]
|
||||
eventsRange: EventsRange | null
|
||||
}
|
||||
|
||||
export class SessionManager {
|
||||
@ -295,13 +301,35 @@ export class SessionManager {
|
||||
return
|
||||
}
|
||||
|
||||
if (this.buffer.count === 0) {
|
||||
status.warn('⚠️', `blob_ingester_session_manager flush called but buffer is empty`, {
|
||||
sessionId: this.sessionId,
|
||||
partition: this.partition,
|
||||
reason,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// We move the buffer to the flush buffer and create a new buffer so that we can safely write the buffer to disk
|
||||
this.flushBuffer = this.buffer
|
||||
this.buffer = this.createBuffer()
|
||||
|
||||
const eventsRange = this.flushBuffer.eventsRange
|
||||
if (!eventsRange) {
|
||||
status.warn('⚠️', `blob_ingester_session_manager flush called but eventsRange is null`, {
|
||||
sessionId: this.sessionId,
|
||||
partition: this.partition,
|
||||
reason,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const { firstTimestamp, lastTimestamp } = eventsRange
|
||||
|
||||
try {
|
||||
const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.teamId}/session_id/${this.sessionId}`
|
||||
const dataKey = `${baseKey}/data/${this.flushBuffer.oldestKafkaTimestamp}` // TODO: Change to be based on events times
|
||||
const timeRange = `${firstTimestamp}-${lastTimestamp}`
|
||||
const dataKey = `${baseKey}/data/${timeRange}`
|
||||
|
||||
const fileStream = createReadStream(this.flushBuffer.file).pipe(zlib.createGzip())
|
||||
|
||||
@ -369,6 +397,7 @@ export class SessionManager {
|
||||
`${this.teamId}.${this.sessionId}.${id}.jsonl`
|
||||
),
|
||||
offsets: [],
|
||||
eventsRange: null,
|
||||
}
|
||||
|
||||
// NOTE: We can't do this easily async as we would need to handle the race condition of multiple events coming in at once.
|
||||
@ -391,10 +420,23 @@ export class SessionManager {
|
||||
*/
|
||||
private async addToBuffer(message: IncomingRecordingMessage): Promise<void> {
|
||||
try {
|
||||
const content = JSON.stringify(convertToPersistedMessage(message)) + '\n'
|
||||
const messageData = convertToPersistedMessage(message)
|
||||
this.buffer.eventsRange = {
|
||||
firstTimestamp: Math.min(
|
||||
message.events_summary[0].timestamp,
|
||||
this.buffer.eventsRange?.firstTimestamp ?? Infinity
|
||||
),
|
||||
lastTimestamp: Math.max(
|
||||
message.events_summary[message.events_summary.length - 1].timestamp,
|
||||
this.buffer.eventsRange?.lastTimestamp ?? 0
|
||||
),
|
||||
}
|
||||
|
||||
const content = JSON.stringify(messageData) + '\n'
|
||||
this.buffer.count += 1
|
||||
this.buffer.size += Buffer.byteLength(content)
|
||||
this.buffer.offsets.push(message.metadata.offset)
|
||||
|
||||
await appendFile(this.buffer.file, content, 'utf-8')
|
||||
} catch (error) {
|
||||
status.error('🧨', 'blob_ingester_session_manager failed writing session recording buffer to disk', {
|
||||
|
@ -1,3 +1,4 @@
|
||||
import { Upload } from '@aws-sdk/lib-storage'
|
||||
import { createReadStream, writeFileSync } from 'fs'
|
||||
import { appendFile, unlink } from 'fs/promises'
|
||||
import { DateTime, Settings } from 'luxon'
|
||||
@ -13,7 +14,24 @@ jest.mock('fs', () => {
|
||||
return {
|
||||
...jest.requireActual('fs'),
|
||||
writeFileSync: jest.fn(),
|
||||
createReadStream: jest.fn(),
|
||||
createReadStream: jest.fn().mockImplementation(() => {
|
||||
return {
|
||||
pipe: jest.fn(),
|
||||
}
|
||||
}),
|
||||
}
|
||||
})
|
||||
|
||||
jest.mock('@aws-sdk/lib-storage', () => {
|
||||
const mockUpload = jest.fn().mockImplementation(() => {
|
||||
return {
|
||||
done: jest.fn().mockResolvedValue(undefined),
|
||||
}
|
||||
})
|
||||
|
||||
return {
|
||||
__esModule: true,
|
||||
Upload: mockUpload,
|
||||
}
|
||||
})
|
||||
|
||||
@ -62,37 +80,16 @@ describe('session-manager', () => {
|
||||
id: expect.any(String),
|
||||
size: 61, // The size of the event payload - this may change when test data changes
|
||||
offsets: [1],
|
||||
eventsRange: {
|
||||
firstTimestamp: 1679568043305,
|
||||
lastTimestamp: 1679568043305,
|
||||
},
|
||||
})
|
||||
|
||||
// the buffer file was created
|
||||
expect(writeFileSync).toHaveBeenCalledWith(sessionManager.buffer.file, '', 'utf-8')
|
||||
})
|
||||
|
||||
it('tracks buffer age span', async () => {
|
||||
const firstMessageTimestamp = DateTime.now().toMillis() - 10000
|
||||
const secondMessageTimestamp = DateTime.now().toMillis() - 5000
|
||||
|
||||
const payload = JSON.stringify([{ simple: 'data' }])
|
||||
const event = createIncomingRecordingMessage({
|
||||
data: compressToString(payload),
|
||||
})
|
||||
|
||||
event.metadata.timestamp = firstMessageTimestamp
|
||||
await sessionManager.add(event)
|
||||
|
||||
event.metadata.timestamp = secondMessageTimestamp
|
||||
await sessionManager.add(event)
|
||||
|
||||
expect(sessionManager.buffer).toEqual({
|
||||
count: 2,
|
||||
oldestKafkaTimestamp: firstMessageTimestamp,
|
||||
file: expect.any(String),
|
||||
id: expect.any(String),
|
||||
size: 61 * 2, // The size of the event payload - this may change when test data changes
|
||||
offsets: [1, 1],
|
||||
})
|
||||
})
|
||||
|
||||
it('does not flush if it has received a message recently', async () => {
|
||||
const payload = JSON.stringify([{ simple: 'data' }])
|
||||
const event = createIncomingRecordingMessage({
|
||||
@ -115,19 +112,51 @@ describe('session-manager', () => {
|
||||
})
|
||||
|
||||
it('does flush if it has not received a message recently', async () => {
|
||||
const firstTimestamp = 1679568043305
|
||||
const lastTimestamp = 1679568043305 + 4000
|
||||
|
||||
const payload = JSON.stringify([{ simple: 'data' }])
|
||||
const event = createIncomingRecordingMessage({
|
||||
const eventOne = createIncomingRecordingMessage({
|
||||
data: compressToString(payload),
|
||||
events_summary: [
|
||||
{
|
||||
timestamp: firstTimestamp,
|
||||
type: 4,
|
||||
data: { href: 'http://localhost:3001/', width: 2560, height: 1304 },
|
||||
},
|
||||
],
|
||||
})
|
||||
const eventTwo = createIncomingRecordingMessage({
|
||||
data: compressToString(payload),
|
||||
events_summary: [
|
||||
{
|
||||
timestamp: lastTimestamp,
|
||||
type: 4,
|
||||
data: { href: 'http://localhost:3001/', width: 2560, height: 1304 },
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
const flushThreshold = 2500 // any value here...
|
||||
event.metadata.timestamp = DateTime.now().minus({ milliseconds: flushThreshold }).toMillis()
|
||||
await sessionManager.add(event)
|
||||
eventOne.metadata.timestamp = DateTime.now().minus({ milliseconds: flushThreshold }).toMillis()
|
||||
await sessionManager.add(eventOne)
|
||||
eventTwo.metadata.timestamp = DateTime.now().minus({ milliseconds: flushThreshold }).toMillis()
|
||||
await sessionManager.add(eventTwo)
|
||||
|
||||
await sessionManager.flushIfSessionBufferIsOld(DateTime.now().toMillis(), flushThreshold)
|
||||
|
||||
// as a proxy for flush having been called or not
|
||||
expect(createReadStream).toHaveBeenCalled()
|
||||
const mockUploadCalls = (Upload as unknown as jest.Mock).mock.calls
|
||||
expect(mockUploadCalls.length).toBe(1)
|
||||
expect(mockUploadCalls[0].length).toBe(1)
|
||||
expect(mockUploadCalls[0][0]).toEqual(
|
||||
expect.objectContaining({
|
||||
params: expect.objectContaining({
|
||||
Key: `session_recordings/team_id/1/session_id/session_id_1/data/${firstTimestamp}-${lastTimestamp}`,
|
||||
}),
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
it('does not flush a short session even when lagging if within threshold', async () => {
|
||||
|
Loading…
Reference in New Issue
Block a user