0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 04:12:23 +01:00

fix: lag is not even per partition (#15663)

This commit is contained in:
Paul D'Ambra 2023-05-23 09:50:52 +01:00 committed by GitHub
parent 35522040cb
commit c665dda4da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -59,6 +59,7 @@ export const gaugeBytesBuffered = new Gauge({
export const gaugeLagMilliseconds = new Gauge({
name: 'recording_blob_ingestion_lag_in_milliseconds',
help: "A gauge of the lag in milliseconds, more useful than lag in messages since it affects how much work we'll be pushing to redis",
labelNames: ['partition'],
})
export class SessionRecordingBlobIngester {
@ -69,7 +70,7 @@ export class SessionRecordingBlobIngester {
lastHeartbeat: number = Date.now()
flushInterval: NodeJS.Timer | null = null
enabledTeams: number[] | null
latestKafkaMessageTimestamp: number | null = null
latestKafkaMessageTimestamp: Record<number, number | null> = {}
constructor(
private teamManager: TeamManager,
@ -139,8 +140,9 @@ export class SessionRecordingBlobIngester {
}
// track the latest message timestamp seen so, we can use it to calculate a reference "now"
this.latestKafkaMessageTimestamp = message.timestamp
gaugeLagMilliseconds.set(DateTime.now().toMillis() - message.timestamp)
// lag does not distribute evenly across partitions, so track timestamps per partition
this.latestKafkaMessageTimestamp[message.partition] = message.timestamp
gaugeLagMilliseconds.labels(message.partition.toString()).set(DateTime.now().toMillis() - message.timestamp)
let messagePayload: RawEventMessage
let event: PipelineEvent
@ -346,19 +348,19 @@ export class SessionRecordingBlobIngester {
private async checkEachSession() {
let sessionManagerBufferSizes = 0
// in practice, we will always have a values for latestKaftaMessageTimestamp,
// but in case we get here before the first message, we use now
const kafkaNow = this.latestKafkaMessageTimestamp || DateTime.now().toMillis()
const flushThresholdMillis = this.flushThreshold(
kafkaNow,
DateTime.now().toMillis(),
this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000,
this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_MULTIPLIER
)
for (const [_, sessionManager] of this.sessions) {
sessionManagerBufferSizes += sessionManager.buffer.size
// in practice, we will always have a values for latestKaftaMessageTimestamp,
// but in case we get here before the first message, we use now
const kafkaNow = this.latestKafkaMessageTimestamp[sessionManager.partition] || DateTime.now().toMillis()
const flushThresholdMillis = this.flushThreshold(
kafkaNow,
DateTime.now().toMillis(),
this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000,
this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_MULTIPLIER
)
await sessionManager.flushIfSessionBufferIsOld(kafkaNow, flushThresholdMillis).catch((err) => {
status.error(
'🚽',