mirror of
https://github.com/PostHog/posthog.git
synced 2024-12-01 12:21:02 +01:00
fix: Recording ingestion metrics with proper lag metric (#17048)
This commit is contained in:
parent
2831b4c64f
commit
a8ade37de5
@ -53,6 +53,13 @@ const gaugeLagMilliseconds = new Gauge({
|
||||
labelNames: ['partition'],
|
||||
})
|
||||
|
||||
// NOTE: This gauge is important! It is used as our primary metric for scaling up / down
|
||||
const gaugeLag = new Gauge({
|
||||
name: 'recording_blob_ingestion_lag',
|
||||
help: 'A gauge of the lag in messages, taking into account in progress messages',
|
||||
labelNames: ['partition'],
|
||||
})
|
||||
|
||||
const gaugeOffsetCommitted = new Gauge({
|
||||
name: 'offset_manager_offset_committed',
|
||||
help: 'When a session manager flushes to S3 it reports which offset on the partition it flushed.',
|
||||
@ -77,6 +84,12 @@ const counterKafkaMessageReceived = new Counter({
|
||||
labelNames: ['partition'],
|
||||
})
|
||||
|
||||
type PartitionMetrics = {
|
||||
lastMessageTimestamp?: number
|
||||
lastMessageOffset?: number
|
||||
lastKnownCommit?: number
|
||||
}
|
||||
|
||||
export class SessionRecordingIngesterV2 {
|
||||
sessions: Record<string, SessionManager> = {}
|
||||
offsetHighWaterMarker: OffsetHighWaterMarker
|
||||
@ -84,10 +97,9 @@ export class SessionRecordingIngesterV2 {
|
||||
replayEventsIngester: ReplayEventsIngester
|
||||
batchConsumer?: BatchConsumer
|
||||
flushInterval: NodeJS.Timer | null = null
|
||||
// the time at the most recent message of a particular partition
|
||||
partitionNow: Record<number, number | null> = {}
|
||||
partitionLastKnownCommit: Record<number, number | null> = {}
|
||||
partitionAssignments: Record<number, PartitionMetrics> = {}
|
||||
teamsRefresher: BackgroundRefresher<Record<string, TeamId>>
|
||||
offsetsRefresher: BackgroundRefresher<Record<number, number>>
|
||||
recordingConsumerConfig: PluginsServerConfig
|
||||
|
||||
constructor(
|
||||
@ -116,6 +128,35 @@ export class SessionRecordingIngesterV2 {
|
||||
throw e
|
||||
}
|
||||
})
|
||||
|
||||
this.offsetsRefresher = new BackgroundRefresher(async () => {
|
||||
const results = await Promise.all(
|
||||
Object.keys(this.partitionAssignments).map(async (partition) => {
|
||||
return new Promise<[number, number]>((resolve, reject) => {
|
||||
if (!this.batchConsumer) {
|
||||
return reject('Not connected')
|
||||
}
|
||||
this.batchConsumer.consumer.queryWatermarkOffsets(
|
||||
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
|
||||
parseInt(partition),
|
||||
(err, offsets) => {
|
||||
if (err) {
|
||||
status.error('🔥', 'Failed to query kafka watermark offsets', err)
|
||||
return reject()
|
||||
}
|
||||
|
||||
resolve([parseInt(partition), offsets.highOffset])
|
||||
}
|
||||
)
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
return results.reduce((acc, [partition, highOffset]) => {
|
||||
acc[partition] = highOffset
|
||||
return acc
|
||||
}, {} as Record<number, number>)
|
||||
}, 5000)
|
||||
}
|
||||
|
||||
public async consume(event: IncomingRecordingMessage, sentrySpan?: Sentry.Span): Promise<void> {
|
||||
@ -254,17 +295,30 @@ export class SessionRecordingIngesterV2 {
|
||||
for (const message of messages) {
|
||||
const { partition, offset, timestamp } = message
|
||||
|
||||
if (timestamp) {
|
||||
if (timestamp && this.partitionAssignments[partition]) {
|
||||
const metrics = this.partitionAssignments[partition]
|
||||
|
||||
// For some reason timestamp can be null. If it isn't, update our ingestion metrics
|
||||
counterKafkaMessageReceived.inc({ partition })
|
||||
this.partitionNow[partition] = timestamp
|
||||
metrics.lastMessageTimestamp = timestamp
|
||||
// If we don't have a last known commit then set it to this offset as we can't commit lower than that
|
||||
this.partitionLastKnownCommit[partition] = this.partitionLastKnownCommit[partition] ?? offset
|
||||
metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset
|
||||
metrics.lastMessageOffset = offset
|
||||
|
||||
counterKafkaMessageReceived.inc({ partition })
|
||||
|
||||
gaugeLagMilliseconds
|
||||
.labels({
|
||||
partition: partition.toString(),
|
||||
})
|
||||
.set(now() - timestamp)
|
||||
|
||||
// NOTE: This is an important metric used by the autoscaler
|
||||
const offsetsByPartition = await this.offsetsRefresher.get()
|
||||
const highOffset = offsetsByPartition[partition]
|
||||
|
||||
if (highOffset) {
|
||||
gaugeLag.set({ partition }, highOffset - metrics.lastMessageOffset)
|
||||
}
|
||||
}
|
||||
|
||||
const recordingMessage = await this.parseKafkaMessage(message)
|
||||
@ -362,6 +416,13 @@ export class SessionRecordingIngesterV2 {
|
||||
* The assign_partitions indicates that the consumer group has new assignments.
|
||||
* We don't need to do anything, but it is useful to log for debugging.
|
||||
*/
|
||||
|
||||
topicPartitions.forEach((topicPartition: TopicPartition) => {
|
||||
this.partitionAssignments[topicPartition.partition] = {}
|
||||
})
|
||||
|
||||
await this.offsetsRefresher.refresh()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -380,22 +441,23 @@ export class SessionRecordingIngesterV2 {
|
||||
revokedPartitions.includes(sessionManager.partition)
|
||||
)
|
||||
|
||||
await this.destroySessions(sessionsToDrop)
|
||||
|
||||
gaugeSessionsRevoked.set(sessionsToDrop.length)
|
||||
gaugeSessionsHandled.remove()
|
||||
|
||||
topicPartitions.forEach((topicPartition: TopicPartition) => {
|
||||
const partition = topicPartition.partition
|
||||
|
||||
delete this.partitionAssignments[partition]
|
||||
gaugeLag.remove({ partition })
|
||||
gaugeLagMilliseconds.remove({ partition })
|
||||
gaugeOffsetCommitted.remove({ partition })
|
||||
gaugeOffsetCommitFailed.remove({ partition })
|
||||
this.offsetHighWaterMarker.revoke(topicPartition)
|
||||
this.partitionNow[partition] = null
|
||||
this.partitionLastKnownCommit[partition] = null
|
||||
})
|
||||
|
||||
await this.destroySessions(sessionsToDrop)
|
||||
await this.offsetsRefresher.refresh()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -430,7 +492,7 @@ export class SessionRecordingIngesterV2 {
|
||||
const promises: Promise<void>[] = []
|
||||
for (const [key, sessionManager] of Object.entries(this.sessions)) {
|
||||
// in practice, we will always have a values for latestKafkaMessageTimestamp,
|
||||
const referenceTime = this.partitionNow[sessionManager.partition]
|
||||
const referenceTime = this.partitionAssignments[sessionManager.partition]?.lastMessageTimestamp
|
||||
if (!referenceTime) {
|
||||
status.warn('🤔', 'blob_ingester_consumer - no referenceTime for partition', {
|
||||
partition: sessionManager.partition,
|
||||
@ -516,7 +578,7 @@ export class SessionRecordingIngesterV2 {
|
||||
? potentiallyBlockingOffset
|
||||
: offset
|
||||
|
||||
const lastKnownCommit = this.partitionLastKnownCommit[partition] || 0
|
||||
const lastKnownCommit = this.partitionAssignments[partition]?.lastKnownCommit || 0
|
||||
// TODO: Check how long we have been blocked by any individual session and if it is too long then we should
|
||||
// capture an exception to figure out why
|
||||
if (lastKnownCommit >= highestOffsetToCommit) {
|
||||
@ -524,7 +586,9 @@ export class SessionRecordingIngesterV2 {
|
||||
return
|
||||
}
|
||||
|
||||
this.partitionLastKnownCommit[partition] = highestOffsetToCommit
|
||||
if (this.partitionAssignments[partition]) {
|
||||
this.partitionAssignments[partition].lastKnownCommit = highestOffsetToCommit
|
||||
}
|
||||
|
||||
status.info('💾', `blob_ingester_consumer.commitOffsets - attempting to commit offset`, {
|
||||
partition,
|
||||
|
@ -135,8 +135,10 @@ describe('ingester', () => {
|
||||
await ingester.consume(event)
|
||||
expect(ingester.sessions['1-session_id_1']).toBeDefined()
|
||||
// Force the flush
|
||||
ingester.partitionNow[event.metadata.partition] =
|
||||
Date.now() + defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS
|
||||
ingester.partitionAssignments[event.metadata.partition] = {
|
||||
lastMessageTimestamp: Date.now() + defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS,
|
||||
}
|
||||
|
||||
await ingester.flushAllReadySessions(true)
|
||||
|
||||
jest.runOnlyPendingTimers() // flush timer
|
||||
@ -182,6 +184,8 @@ describe('ingester', () => {
|
||||
})
|
||||
|
||||
it('should commit higher values but not lower', async () => {
|
||||
// We need to simulate the paritition assignent logic here
|
||||
ingester.partitionAssignments[1] = {}
|
||||
await ingester.consume(addMessage('sid1'))
|
||||
await ingester.sessions['1-sid1']?.flush('buffer_age')
|
||||
await tryToCommitLatestOffset()
|
||||
|
Loading…
Reference in New Issue
Block a user