mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-25 02:49:32 +01:00
feat: Optimised blob storage team loading (#16486)
This commit is contained in:
parent
a99420b261
commit
cd2f7f398a
@ -10,15 +10,17 @@ import {
|
||||
TopicPartition,
|
||||
} from 'node-rdkafka-acosom'
|
||||
import path from 'path'
|
||||
import { Pool } from 'pg'
|
||||
import { Gauge } from 'prom-client'
|
||||
|
||||
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
|
||||
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
|
||||
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
|
||||
import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer'
|
||||
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, Team } from '../../../types'
|
||||
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types'
|
||||
import { BackgroundRefresher } from '../../../utils/background-refresher'
|
||||
import { status } from '../../../utils/status'
|
||||
import { TeamManager } from '../../../worker/ingestion/team-manager'
|
||||
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
|
||||
import { ObjectStorage } from '../../services/object_storage'
|
||||
import { eventDroppedCounter } from '../metrics'
|
||||
import { RealtimeManager } from './blob-ingester/realtime-manager'
|
||||
@ -82,10 +84,11 @@ export class SessionRecordingBlobIngester {
|
||||
flushInterval: NodeJS.Timer | null = null
|
||||
// the time at the most recent message of a particular partition
|
||||
partitionNow: Record<number, number | null> = {}
|
||||
teamsRefresher: BackgroundRefresher<Record<string, TeamId>>
|
||||
|
||||
constructor(
|
||||
private teamManager: TeamManager,
|
||||
private serverConfig: PluginsServerConfig,
|
||||
private postgres: Pool,
|
||||
private objectStorage: ObjectStorage,
|
||||
private redisPool: RedisPool
|
||||
) {
|
||||
@ -100,6 +103,17 @@ export class SessionRecordingBlobIngester {
|
||||
this.redisPool,
|
||||
serverConfig.SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY
|
||||
)
|
||||
|
||||
this.teamsRefresher = new BackgroundRefresher(async () => {
|
||||
try {
|
||||
status.info('🔁', 'blob_ingester_consumer - refreshing teams in the background')
|
||||
return await fetchTeamTokensWithRecordings(this.postgres)
|
||||
} catch (e) {
|
||||
status.error('🔥', 'blob_ingester_consumer - failed to refresh teams in the background', e)
|
||||
captureException(e)
|
||||
throw e
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
public async consume(event: IncomingRecordingMessage, sentrySpan?: Sentry.Span): Promise<void> {
|
||||
@ -208,33 +222,29 @@ export class SessionRecordingBlobIngester {
|
||||
return statusWarn('no_token')
|
||||
}
|
||||
|
||||
let team: Team | null = null
|
||||
let teamId: TeamId | null = null
|
||||
const token = messagePayload.token
|
||||
|
||||
const teamSpan = span?.startChild({
|
||||
op: 'fetchTeam',
|
||||
})
|
||||
if (messagePayload.team_id != null) {
|
||||
team = await this.teamManager.fetchTeam(messagePayload.team_id)
|
||||
} else if (messagePayload.token) {
|
||||
team = await this.teamManager.getTeamByToken(messagePayload.token)
|
||||
if (token) {
|
||||
teamId = await this.teamsRefresher.get().then((teams) => teams[token] || null)
|
||||
}
|
||||
teamSpan?.finish()
|
||||
|
||||
if (team == null) {
|
||||
return statusWarn('team_not_found', {
|
||||
teamId: messagePayload.team_id,
|
||||
payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown',
|
||||
})
|
||||
}
|
||||
|
||||
if (!team.session_recording_opt_in) {
|
||||
if (teamId == null) {
|
||||
eventDroppedCounter
|
||||
.labels({
|
||||
event_type: 'session_recordings_blob_ingestion',
|
||||
drop_cause: 'disabled',
|
||||
drop_cause: 'team_missing_or_disabled',
|
||||
})
|
||||
.inc()
|
||||
return
|
||||
|
||||
return statusWarn('team_missing_or_disabled', {
|
||||
teamId: messagePayload.team_id,
|
||||
payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown',
|
||||
})
|
||||
}
|
||||
|
||||
const recordingMessage: IncomingRecordingMessage = {
|
||||
@ -245,7 +255,7 @@ export class SessionRecordingBlobIngester {
|
||||
timestamp: message.timestamp,
|
||||
},
|
||||
|
||||
team_id: team.id,
|
||||
team_id: teamId,
|
||||
distinct_id: event.distinct_id,
|
||||
session_id: event.properties?.$session_id,
|
||||
window_id: event.properties?.$window_id,
|
||||
@ -291,6 +301,8 @@ export class SessionRecordingBlobIngester {
|
||||
throw e
|
||||
}
|
||||
await this.realtimeManager.subscribe()
|
||||
// Load teams into memory
|
||||
await this.teamsRefresher.refresh()
|
||||
|
||||
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)
|
||||
this.producer = await createKafkaProducer(connectionConfig)
|
||||
|
@ -436,14 +436,13 @@ export async function startPluginsServer(
|
||||
if (capabilities.sessionRecordingBlobIngestion) {
|
||||
const blobServerConfig = sessionRecordingBlobConsumerConfig(serverConfig)
|
||||
const postgres = hub?.postgres ?? createPostgresPool(blobServerConfig.DATABASE_URL)
|
||||
const teamManager = hub?.teamManager ?? new TeamManager(postgres, blobServerConfig)
|
||||
const s3 = hub?.objectStorage ?? getObjectStorage(blobServerConfig)
|
||||
const redisPool = hub?.db.redisPool ?? createRedisPool(blobServerConfig)
|
||||
|
||||
if (!s3) {
|
||||
throw new Error("Can't start session recording blob ingestion without object storage")
|
||||
}
|
||||
const ingester = new SessionRecordingBlobIngester(teamManager, blobServerConfig, s3, redisPool)
|
||||
const ingester = new SessionRecordingBlobIngester(blobServerConfig, postgres, s3, redisPool)
|
||||
await ingester.start()
|
||||
const batchConsumer = ingester.batchConsumer
|
||||
if (batchConsumer) {
|
||||
|
42
plugin-server/src/utils/background-refresher.ts
Normal file
42
plugin-server/src/utils/background-refresher.ts
Normal file
@ -0,0 +1,42 @@
|
||||
import { status } from './status'
|
||||
|
||||
// A background refresher will act like a TTL cache but choosing to refresh the value in the background rather than
|
||||
// dropping the data or blocking the request.
|
||||
export class BackgroundRefresher<T> {
|
||||
private cachedValue: T | undefined = undefined
|
||||
private cachedValuePromise: Promise<T> | null = null
|
||||
private lastRefreshTime = 0
|
||||
|
||||
constructor(private readonly refreshFunction: () => Promise<T>, private readonly maxAgeMs: number = 1000 * 60) {}
|
||||
|
||||
public async refresh(): Promise<T> {
|
||||
if (this.cachedValuePromise) {
|
||||
return this.cachedValuePromise
|
||||
}
|
||||
try {
|
||||
this.cachedValuePromise = this.refreshFunction()
|
||||
this.cachedValue = await this.cachedValuePromise
|
||||
} catch (e) {
|
||||
status.error('BackgroundRefresher: Error refreshing background task', e)
|
||||
throw e
|
||||
} finally {
|
||||
this.cachedValuePromise = null
|
||||
this.lastRefreshTime = Date.now()
|
||||
}
|
||||
|
||||
return this.cachedValue
|
||||
}
|
||||
|
||||
public async get(): Promise<T> {
|
||||
if (!this.cachedValue) {
|
||||
await this.refresh()
|
||||
}
|
||||
|
||||
if (Date.now() - this.lastRefreshTime > this.maxAgeMs) {
|
||||
// We trigger the refresh but we don't use it
|
||||
void this.refresh()
|
||||
}
|
||||
|
||||
return this.cachedValue!
|
||||
}
|
||||
}
|
@ -192,3 +192,21 @@ export async function fetchTeamByToken(client: Client | Pool, token: string): Pr
|
||||
)
|
||||
return selectResult.rows[0] ?? null
|
||||
}
|
||||
|
||||
export async function fetchTeamTokensWithRecordings(client: Client | Pool): Promise<Record<string, TeamId>> {
|
||||
const selectResult = await postgresQuery<Pick<Team, 'id' | 'api_token'>>(
|
||||
client,
|
||||
`
|
||||
SELECT id, api_token
|
||||
FROM posthog_team
|
||||
WHERE session_recording_opt_in = true
|
||||
`,
|
||||
[],
|
||||
'fetchTeamTokensWithRecordings'
|
||||
)
|
||||
|
||||
return selectResult.rows.reduce((acc, row) => {
|
||||
acc[row.api_token] = row.id
|
||||
return acc
|
||||
}, {} as Record<string, TeamId>)
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ describe('ingester rebalancing tests', () => {
|
||||
})
|
||||
|
||||
it('rebalances partitions safely from one to two consumers', async () => {
|
||||
ingesterOne = new SessionRecordingBlobIngester(hub.teamManager, config, hub.objectStorage, hub.redisPool)
|
||||
ingesterOne = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool)
|
||||
|
||||
await ingesterOne.start()
|
||||
|
||||
@ -59,7 +59,7 @@ describe('ingester rebalancing tests', () => {
|
||||
assertIngesterHasExpectedPartitions(ingesterOne, [1])
|
||||
})
|
||||
|
||||
ingesterTwo = new SessionRecordingBlobIngester(hub.teamManager, config, hub.objectStorage, hub.redisPool)
|
||||
ingesterTwo = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool)
|
||||
await ingesterTwo.start()
|
||||
|
||||
await waitForExpect(() => {
|
||||
|
@ -84,11 +84,11 @@ describe('ingester', () => {
|
||||
// these tests assume that a flush won't run while they run
|
||||
beforeEach(async () => {
|
||||
ingester = new SessionRecordingBlobIngester(
|
||||
hub.teamManager,
|
||||
{
|
||||
...defaultConfig,
|
||||
SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY: keyPrefix,
|
||||
},
|
||||
hub.postgres,
|
||||
hub.objectStorage,
|
||||
hub.redisPool
|
||||
)
|
||||
|
82
plugin-server/tests/utils/background-refresher.test.ts
Normal file
82
plugin-server/tests/utils/background-refresher.test.ts
Normal file
@ -0,0 +1,82 @@
|
||||
import { BackgroundRefresher } from '../../src/utils/background-refresher'
|
||||
|
||||
const realNow = Date.now
|
||||
|
||||
describe('getNextRetryMs', () => {
|
||||
jest.useFakeTimers()
|
||||
jest.setTimeout(1000)
|
||||
let refreshFunction = jest.fn()
|
||||
let refresher = new BackgroundRefresher(refreshFunction)
|
||||
|
||||
beforeEach(() => {
|
||||
refreshFunction = jest.fn()
|
||||
refresher = new BackgroundRefresher(refreshFunction, 100)
|
||||
})
|
||||
|
||||
beforeAll(() => {
|
||||
global.Date.now = jest.fn(() => new Date('2019-04-07T10:20:30Z').getTime())
|
||||
})
|
||||
|
||||
afterAll(() => {
|
||||
global.Date.now = realNow
|
||||
})
|
||||
|
||||
it('simple gets', async () => {
|
||||
refreshFunction.mockResolvedValue('foo')
|
||||
await expect(refresher.get()).resolves.toEqual('foo')
|
||||
refreshFunction.mockResolvedValue('foo2')
|
||||
await expect(refresher.get()).resolves.toEqual('foo')
|
||||
await expect(refresher.refresh()).resolves.toEqual('foo2')
|
||||
await expect(refresher.get()).resolves.toEqual('foo2')
|
||||
})
|
||||
|
||||
it('only one call per refresh', async () => {
|
||||
refreshFunction.mockImplementation(async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
return 'foo'
|
||||
})
|
||||
|
||||
const promises: Promise<any>[] = []
|
||||
|
||||
expect(refreshFunction).toHaveBeenCalledTimes(0)
|
||||
promises.push(refresher.get())
|
||||
expect(refreshFunction).toHaveBeenCalledTimes(1)
|
||||
promises.push(refresher.get())
|
||||
promises.push(refresher.get())
|
||||
expect(refreshFunction).toHaveBeenCalledTimes(1)
|
||||
jest.runOnlyPendingTimers()
|
||||
|
||||
expect(await Promise.all(promises)).toEqual(['foo', 'foo', 'foo'])
|
||||
})
|
||||
|
||||
it('refreshes in the background', async () => {
|
||||
let count = 1
|
||||
let timeAdavance = 0
|
||||
global.Date.now = jest.fn(() => realNow() + timeAdavance)
|
||||
refresher = new BackgroundRefresher(refreshFunction, 10000)
|
||||
|
||||
refreshFunction.mockImplementation(async () => {
|
||||
await new Promise((r) => setTimeout(r, 1000))
|
||||
return 'foo' + count++
|
||||
})
|
||||
|
||||
// First time we need to trigger the timer before awaiting as it waits for the value
|
||||
let response = refresher.get()
|
||||
jest.runOnlyPendingTimers()
|
||||
await expect(response).resolves.toEqual('foo1')
|
||||
await expect(refresher.get()).resolves.toEqual('foo1')
|
||||
expect(refreshFunction).toHaveBeenCalledTimes(1)
|
||||
// Advance time forward by more than the refresh interval
|
||||
timeAdavance = 10000 + 1000
|
||||
// This will trigger the background refresh
|
||||
response = refresher.get()
|
||||
// which we resolve the timeout for
|
||||
jest.runOnlyPendingTimers()
|
||||
// the original call gets the old value as it doesn't wait
|
||||
await expect(response).resolves.toEqual('foo1')
|
||||
expect(refreshFunction).toHaveBeenCalledTimes(2)
|
||||
// the next call gets the new value
|
||||
await expect(refresher.get()).resolves.toEqual('foo2')
|
||||
expect(refreshFunction).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
Loading…
Reference in New Issue
Block a user